This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7bf00fb19 [feature][connector][cdc] add cdc reader jdbc related (#3433)
7bf00fb19 is described below
commit 7bf00fb19f62ef21c9d1c93da7cf256d18a2e4df
Author: ic4y <[email protected]>
AuthorDate: Tue Nov 15 15:25:26 2022 +0800
[feature][connector][cdc] add cdc reader jdbc related (#3433)
* [feature][connector] add cdc reader jdbc related
* [feature][connector] fix check style
---
.../cdc/base/config/JdbcSourceConfig.java | 134 ++++++++++++++++
.../cdc/base/dialect/JdbcDataSourceDialect.java | 67 ++++++++
.../base/relational/JdbcSourceEventDispatcher.java | 93 +++++++++++
.../relational/connection/ConnectionPoolId.java | 60 +++++++
.../relational/connection/ConnectionPools.java | 31 ++++
.../connection/JdbcConnectionFactory.java | 84 ++++++++++
.../connection/JdbcConnectionPoolFactory.java | 67 ++++++++
.../relational/connection/JdbcConnectionPools.java | 59 +++++++
.../external/JdbcSourceFetchTaskContext.java | 176 +++++++++++++++++++++
9 files changed, 771 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
new file mode 100644
index 000000000..087268ca9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
+import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * A Source configuration which is used by {@link IncrementalSource} which
used JDBC data source.
+ */
+public abstract class JdbcSourceConfig extends BaseSourceConfig {
+
+ protected final String driverClassName;
+ protected final String hostname;
+ protected final int port;
+ protected final String username;
+ protected final String password;
+ protected final List<String> databaseList;
+ protected final List<String> tableList;
+ protected final int fetchSize;
+ protected final String serverTimeZone;
+ protected final Duration connectTimeout;
+ protected final int connectMaxRetries;
+ protected final int connectionPoolSize;
+
+ public JdbcSourceConfig(
+ StartupConfig startupConfig,
+ StopConfig stopConfig,
+ List<String> databaseList,
+ List<String> tableList,
+ int splitSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ Properties dbzProperties,
+ String driverClassName,
+ String hostname,
+ int port,
+ String username,
+ String password,
+ int fetchSize,
+ String serverTimeZone,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize) {
+ super(
+ startupConfig,
+ stopConfig,
+ splitSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ dbzProperties);
+ this.driverClassName = driverClassName;
+ this.hostname = hostname;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.databaseList = databaseList;
+ this.tableList = tableList;
+ this.fetchSize = fetchSize;
+ this.serverTimeZone = serverTimeZone;
+ this.connectTimeout = connectTimeout;
+ this.connectMaxRetries = connectMaxRetries;
+ this.connectionPoolSize = connectionPoolSize;
+ }
+
+ public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
+
+ public String getDriverClassName() {
+ return driverClassName;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public List<String> getDatabaseList() {
+ return databaseList;
+ }
+
+ public List<String> getTableList() {
+ return tableList;
+ }
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
+
+ public String getServerTimeZone() {
+ return serverTimeZone;
+ }
+
+ public Duration getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public int getConnectMaxRetries() {
+ return connectMaxRetries;
+ }
+
+ public int getConnectionPoolSize() {
+ return connectionPoolSize;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
new file mode 100644
index 000000000..d17f0267a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.dialect;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionFactory;
+import
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import
org.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import java.util.List;
+
+public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfig> {
+
+ /** Discovers the list of table to capture. */
+ @Override
+ List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);
+
+ /**
+ * Creates and opens a new {@link JdbcConnection} backing connection pool.
+ *
+ * @param sourceConfig a basic source configuration.
+ * @return a utility that simplifies using a JDBC connection.
+ */
+ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
+ JdbcConnection jdbc =
+ new JdbcConnection(
+ sourceConfig.getDbzConfiguration(),
+ new JdbcConnectionFactory(sourceConfig,
getPooledDataSourceFactory()));
+ try {
+ jdbc.connect();
+ } catch (Exception e) {
+ throw new SeaTunnelException(e);
+ }
+ return jdbc;
+ }
+
+ /** Get a connection pool factory to create connection pool. */
+ JdbcConnectionPoolFactory getPooledDataSourceFactory();
+
+ @Override
+ FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase
sourceSplitBase);
+
+ @Override
+ JdbcSourceFetchTaskContext createFetchTaskContext(
+ SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig);
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
new file mode 100644
index 000000000..4cf92927c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.EventMetadataProvider;
+import io.debezium.pipeline.spi.ChangeEventCreator;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.schema.DataCollectionFilters;
+import io.debezium.schema.DatabaseSchema;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import
org.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
+import org.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
+
+import java.util.Map;
+
+/**
+ * A subclass implementation of {@link EventDispatcher}.
+ *
+ * <pre>
+ * 1. This class shares one {@link ChangeEventQueue} between multiple readers.
+ * 2. This class override some methods for dispatching {@link HistoryRecord}
directly,
+ * this is useful for downstream to deserialize the {@link HistoryRecord}
back.
+ * </pre>
+ */
+public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
+
+ private final ChangeEventQueue<DataChangeEvent> queue;
+
+ private final String topic;
+
+ public JdbcSourceEventDispatcher(
+ CommonConnectorConfig connectorConfig,
+ TopicSelector<TableId> topicSelector,
+ DatabaseSchema<TableId> schema,
+ ChangeEventQueue<DataChangeEvent> queue,
+ DataCollectionFilters.DataCollectionFilter<TableId> filter,
+ ChangeEventCreator changeEventCreator,
+ EventMetadataProvider metadataProvider,
+ SchemaNameAdjuster schemaNameAdjuster) {
+ super(
+ connectorConfig,
+ topicSelector,
+ schema,
+ queue,
+ filter,
+ changeEventCreator,
+ metadataProvider,
+ schemaNameAdjuster);
+ this.queue = queue;
+ this.topic = topicSelector.getPrimaryTopic();
+ }
+
+ public ChangeEventQueue<DataChangeEvent> getQueue() {
+ return queue;
+ }
+
+ public void dispatchWatermarkEvent(
+ Map<String, ?> sourcePartition,
+ SourceSplitBase sourceSplit,
+ Offset watermark,
+ WatermarkKind watermarkKind)
+ throws InterruptedException {
+
+ SourceRecord sourceRecord =
+ WatermarkEvent.create(
+ sourcePartition, topic, sourceSplit.splitId(), watermarkKind,
watermark);
+ queue.enqueue(new DataChangeEvent(sourceRecord));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPoolId.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPoolId.java
new file mode 100644
index 000000000..7074c9ec7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPoolId.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The connection pool identifier. */
+public class ConnectionPoolId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final String host;
+ private final int port;
+ private final String username;
+
+ public ConnectionPoolId(String host, int port, String username) {
+ this.host = host;
+ this.port = port;
+ this.username = username;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ConnectionPoolId)) {
+ return false;
+ }
+ ConnectionPoolId that = (ConnectionPoolId) o;
+ return Objects.equals(host, that.host)
+ && Objects.equals(port, that.port)
+ && Objects.equals(username, that.username);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port, username);
+ }
+
+ @Override
+ public String toString() {
+ return username + '@' + host + ':' + port;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPools.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPools.java
new file mode 100644
index 000000000..f641ae0fc
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPools.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+
+/** A pool collection that consists of multiple connection pools. */
+
+public interface ConnectionPools<P, C extends SourceConfig> {
+
+ /**
+ * Gets a connection pool from pools, create a new pool if the pool does
not exists in the
+ * connection pools .
+ */
+ P getOrCreateConnectionPool(ConnectionPoolId poolId, C sourceConfig);
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionFactory.java
new file mode 100644
index 000000000..efc1e4baf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/** A factory to create JDBC connection. */
+public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory
{
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcConnectionFactory.class);
+
+ private final JdbcSourceConfig sourceConfig;
+ private final JdbcConnectionPoolFactory jdbcConnectionPoolFactory;
+
+ public JdbcConnectionFactory(
+ JdbcSourceConfig sourceConfig, JdbcConnectionPoolFactory
jdbcConnectionPoolFactory) {
+ this.sourceConfig = sourceConfig;
+ this.jdbcConnectionPoolFactory = jdbcConnectionPoolFactory;
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public Connection connect(JdbcConfiguration config) throws SQLException {
+ final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
+
+ final ConnectionPoolId connectionPoolId =
+ new ConnectionPoolId(
+ sourceConfig.getHostname(),
+ sourceConfig.getPort(),
+ sourceConfig.getUsername());
+
+ HikariDataSource dataSource =
+ JdbcConnectionPools.getInstance(jdbcConnectionPoolFactory)
+ .getOrCreateConnectionPool(connectionPoolId,
sourceConfig);
+
+ int i = 0;
+ while (i < connectRetryTimes) {
+ try {
+ return dataSource.getConnection();
+ } catch (SQLException e) {
+ if (i < connectRetryTimes - 1) {
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException ie) {
+ throw new SeaTunnelException(
+ "Failed to get connection, interrupted while
doing another attempt",
+ ie);
+ }
+ LOG.warn("Get connection failed, retry times {}", i + 1);
+ } else {
+ LOG.error("Get connection failed after retry {} times", i
+ 1);
+ throw new SeaTunnelException(e);
+ }
+ }
+ i++;
+ }
+ return dataSource.getConnection();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
new file mode 100644
index 000000000..6152a55b4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+
+/** A connection pool factory to create pooled DataSource {@link
HikariDataSource}. */
+public abstract class JdbcConnectionPoolFactory {
+
+ public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
+ public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
+ public static final int MINIMUM_POOL_SIZE = 1;
+
+ public HikariDataSource createPooledDataSource(JdbcSourceConfig
sourceConfig) {
+ final HikariConfig config = new HikariConfig();
+
+ String hostName = sourceConfig.getHostname();
+ int port = sourceConfig.getPort();
+
+ config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
+ config.setJdbcUrl(getJdbcUrl(sourceConfig));
+ config.setUsername(sourceConfig.getUsername());
+ config.setPassword(sourceConfig.getPassword());
+ config.setMinimumIdle(MINIMUM_POOL_SIZE);
+ config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
+
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+ config.addDataSourceProperty(SERVER_TIMEZONE_KEY,
sourceConfig.getServerTimeZone());
+ config.setDriverClassName(sourceConfig.getDriverClassName());
+
+ // optional optimization configurations for pooled DataSource
+ config.addDataSourceProperty("cachePrepStmts", "true");
+ config.addDataSourceProperty("prepStmtCacheSize", "250");
+ config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
+
+ return new HikariDataSource(config);
+ }
+
+ /**
+ * due to relational database url of the forms are different, e.g. Mysql
<code>
+ * jdbc:mysql://<em>hostname</em>:<em>port</em></code>, Oracle Thin <code>
+ *
jdbc:oracle:thin:@<em>hostname</em>:<em>port</em>:<em>dbName</em></code> DB2
<code>
+ * jdbc:db2://<em>hostname</em>:<em>port</em>/<em>dbName</em></code>
Sybase <code>
+ * jdbc:sybase:Tds:<em>hostname</em>:<em>port</em></code>, so generate a
jdbc url by specific
+ * database.
+ *
+ * @param sourceConfig a basic Source configuration.
+ * @return a database url.
+ */
+ public abstract String getJdbcUrl(JdbcSourceConfig sourceConfig);
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPools.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPools.java
new file mode 100644
index 000000000..b3c70aad1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPools.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Jdbc Connection pools implementation. */
+public class JdbcConnectionPools implements ConnectionPools<HikariDataSource,
JdbcSourceConfig> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcConnectionPools.class);
+
+ private static JdbcConnectionPools INSTANCE;
+ private final Map<ConnectionPoolId, HikariDataSource> pools = new
HashMap<>();
+ private static JdbcConnectionPoolFactory JDBCCONNECTIONPOOLFACTORY;
+
+ private JdbcConnectionPools() {}
+
+ public static synchronized JdbcConnectionPools getInstance(
+ JdbcConnectionPoolFactory jdbcConnectionPoolFactory) {
+ if (INSTANCE == null) {
+ JdbcConnectionPools.JDBCCONNECTIONPOOLFACTORY =
jdbcConnectionPoolFactory;
+ INSTANCE = new JdbcConnectionPools();
+ }
+ return INSTANCE;
+ }
+
+ @Override
+ public HikariDataSource getOrCreateConnectionPool(
+ ConnectionPoolId poolId, JdbcSourceConfig sourceConfig) {
+ synchronized (pools) {
+ if (!pools.containsKey(poolId)) {
+ LOG.info("Create and register connection pool {}", poolId);
+ pools.put(poolId,
JDBCCONNECTIONPOOLFACTORY.createPooledDataSource(sourceConfig));
+ }
+ return pools.get(poolId);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
new file mode 100644
index 000000000..42bb1dbb5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.reader.external;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.RelationalDatabaseSchema;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import org.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The context for fetch task that fetching data of snapshot split from JDBC
data source.
+ */
+public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
+
+ protected final JdbcSourceConfig sourceConfig;
+ protected final JdbcDataSourceDialect dataSourceDialect;
+ protected final CommonConnectorConfig dbzConnectorConfig;
+ protected final SchemaNameAdjuster schemaNameAdjuster;
+
+ public JdbcSourceFetchTaskContext(
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect
dataSourceDialect) {
+ this.sourceConfig = sourceConfig;
+ this.dataSourceDialect = dataSourceDialect;
+ this.dbzConnectorConfig = sourceConfig.getDbzConnectorConfig();
+ this.schemaNameAdjuster = SchemaNameAdjuster.create();
+ }
+
+ @Override
+ public TableId getTableId(SourceRecord record) {
+ return null;
+ }
+
+ @Override
+ public boolean isDataChangeRecord(SourceRecord record) {
+ return false;
+ }
+
+ @Override
+ public boolean isRecordBetween(SourceRecord record, Object[] splitStart,
Object[] splitEnd) {
+ SeaTunnelRowType splitKeyType =
+
getSplitType(getDatabaseSchema().tableFor(SourceRecordUtils.getTableId(record)));
+ Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record,
getSchemaNameAdjuster());
+ return SourceRecordUtils.splitKeyRangeContains(key, splitStart,
splitEnd);
+ }
+
+ @SuppressWarnings("checkstyle:MissingSwitchDefault")
+ @Override
+ public void rewriteOutputBuffer(
+ Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
+ Struct key = (Struct) changeRecord.key();
+ Struct value = (Struct) changeRecord.value();
+ if (value != null) {
+ Envelope.Operation operation =
+
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
+ switch (operation) {
+ case CREATE:
+ case UPDATE:
+ Envelope envelope =
Envelope.fromSchema(changeRecord.valueSchema());
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ Instant fetchTs =
+ Instant.ofEpochMilli((Long)
source.get(Envelope.FieldName.TIMESTAMP));
+ SourceRecord record =
+ new SourceRecord(
+ changeRecord.sourcePartition(),
+ changeRecord.sourceOffset(),
+ changeRecord.topic(),
+ changeRecord.kafkaPartition(),
+ changeRecord.keySchema(),
+ changeRecord.key(),
+ changeRecord.valueSchema(),
+ envelope.read(after, source, fetchTs));
+ outputBuffer.put(key, record);
+ break;
+ case DELETE:
+ outputBuffer.remove(key);
+ break;
+ case READ:
+ throw new IllegalStateException(
+ String.format(
+ "Data change record shouldn't use READ operation,
the the record is %s.",
+ changeRecord));
+ }
+ }
+ }
+
+ @Override
+ public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord>
snapshotRecords) {
+ return snapshotRecords.stream()
+ .map(
+ record -> {
+ Envelope envelope =
Envelope.fromSchema(record.valueSchema());
+ Struct value = (Struct) record.value();
+ Struct updateAfter =
value.getStruct(Envelope.FieldName.AFTER);
+ // set message timestamp (source.ts_ms) to 0L
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ source.put(Envelope.FieldName.TIMESTAMP, 0L);
+ // extend the fetch timestamp(ts_ms)
+ Instant fetchTs =
+ Instant.ofEpochMilli(
+ value.getInt64(Envelope.FieldName.TIMESTAMP));
+ SourceRecord sourceRecord =
+ new SourceRecord(
+ record.sourcePartition(),
+ record.sourceOffset(),
+ record.topic(),
+ record.kafkaPartition(),
+ record.keySchema(),
+ record.key(),
+ record.valueSchema(),
+ envelope.read(updateAfter, source, fetchTs));
+ return sourceRecord;
+ })
+ .collect(Collectors.toList());
+ }
+
+ public SourceConfig getSourceConfig() {
+ return sourceConfig;
+ }
+
+ public JdbcDataSourceDialect getDataSourceDialect() {
+ return dataSourceDialect;
+ }
+
+ public CommonConnectorConfig getDbzConnectorConfig() {
+ return dbzConnectorConfig;
+ }
+
+ public SchemaNameAdjuster getSchemaNameAdjuster() {
+ return null;
+ }
+
+ public abstract RelationalDatabaseSchema getDatabaseSchema();
+
+ public abstract SeaTunnelRowType getSplitType(Table table);
+
+ public abstract ErrorHandler getErrorHandler();
+
+ public abstract JdbcSourceEventDispatcher getDispatcher();
+
+ public abstract OffsetContext getOffsetContext();
+}