This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7bf00fb19 [feature][connector][cdc] add cdc reader jdbc related (#3433)
7bf00fb19 is described below

commit 7bf00fb19f62ef21c9d1c93da7cf256d18a2e4df
Author: ic4y <[email protected]>
AuthorDate: Tue Nov 15 15:25:26 2022 +0800

    [feature][connector][cdc] add cdc reader jdbc related (#3433)
    
    * [feature][connector] add cdc reader jdbc related
    
    * [feature][connector] fix check style
---
 .../cdc/base/config/JdbcSourceConfig.java          | 134 ++++++++++++++++
 .../cdc/base/dialect/JdbcDataSourceDialect.java    |  67 ++++++++
 .../base/relational/JdbcSourceEventDispatcher.java |  93 +++++++++++
 .../relational/connection/ConnectionPoolId.java    |  60 +++++++
 .../relational/connection/ConnectionPools.java     |  31 ++++
 .../connection/JdbcConnectionFactory.java          |  84 ++++++++++
 .../connection/JdbcConnectionPoolFactory.java      |  67 ++++++++
 .../relational/connection/JdbcConnectionPools.java |  59 +++++++
 .../external/JdbcSourceFetchTaskContext.java       | 176 +++++++++++++++++++++
 9 files changed, 771 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
new file mode 100644
index 000000000..087268ca9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.config;
+
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
+import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * A Source configuration which is used by {@link IncrementalSource} which 
used JDBC data source.
+ */
+public abstract class JdbcSourceConfig extends BaseSourceConfig {
+
+    protected final String driverClassName;
+    protected final String hostname;
+    protected final int port;
+    protected final String username;
+    protected final String password;
+    protected final List<String> databaseList;
+    protected final List<String> tableList;
+    protected final int fetchSize;
+    protected final String serverTimeZone;
+    protected final Duration connectTimeout;
+    protected final int connectMaxRetries;
+    protected final int connectionPoolSize;
+
+    public JdbcSourceConfig(
+            StartupConfig startupConfig,
+            StopConfig stopConfig,
+            List<String> databaseList,
+            List<String> tableList,
+            int splitSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            Properties dbzProperties,
+            String driverClassName,
+            String hostname,
+            int port,
+            String username,
+            String password,
+            int fetchSize,
+            String serverTimeZone,
+            Duration connectTimeout,
+            int connectMaxRetries,
+            int connectionPoolSize) {
+        super(
+                startupConfig,
+                stopConfig,
+                splitSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                dbzProperties);
+        this.driverClassName = driverClassName;
+        this.hostname = hostname;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+        this.databaseList = databaseList;
+        this.tableList = tableList;
+        this.fetchSize = fetchSize;
+        this.serverTimeZone = serverTimeZone;
+        this.connectTimeout = connectTimeout;
+        this.connectMaxRetries = connectMaxRetries;
+        this.connectionPoolSize = connectionPoolSize;
+    }
+
+    public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
+
+    public String getDriverClassName() {
+        return driverClassName;
+    }
+
+    public String getHostname() {
+        return hostname;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public List<String> getDatabaseList() {
+        return databaseList;
+    }
+
+    public List<String> getTableList() {
+        return tableList;
+    }
+
+    public int getFetchSize() {
+        return fetchSize;
+    }
+
+    public String getServerTimeZone() {
+        return serverTimeZone;
+    }
+
+    public Duration getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public int getConnectMaxRetries() {
+        return connectMaxRetries;
+    }
+
+    public int getConnectionPoolSize() {
+        return connectionPoolSize;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
new file mode 100644
index 000000000..d17f0267a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.dialect;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import 
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionFactory;
+import 
org.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
+import org.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
+import 
org.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+
+import java.util.List;
+
+public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfig> {
+
+    /** Discovers the list of table to capture. */
+    @Override
+    List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);
+
+    /**
+     * Creates and opens a new {@link JdbcConnection} backing connection pool.
+     *
+     * @param sourceConfig a basic source configuration.
+     * @return a utility that simplifies using a JDBC connection.
+     */
+    default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
+        JdbcConnection jdbc =
+                new JdbcConnection(
+                        sourceConfig.getDbzConfiguration(),
+                        new JdbcConnectionFactory(sourceConfig, 
getPooledDataSourceFactory()));
+        try {
+            jdbc.connect();
+        } catch (Exception e) {
+            throw new SeaTunnelException(e);
+        }
+        return jdbc;
+    }
+
+    /** Get a connection pool factory to create connection pool. */
+    JdbcConnectionPoolFactory getPooledDataSourceFactory();
+
+    @Override
+    FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase);
+
+    @Override
+    JdbcSourceFetchTaskContext createFetchTaskContext(
+            SourceSplitBase sourceSplitBase, JdbcSourceConfig 
taskSourceConfig);
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
new file mode 100644
index 000000000..4cf92927c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.connector.base.ChangeEventQueue;
+import io.debezium.pipeline.DataChangeEvent;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.pipeline.source.spi.EventMetadataProvider;
+import io.debezium.pipeline.spi.ChangeEventCreator;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.schema.DataCollectionFilters;
+import io.debezium.schema.DatabaseSchema;
+import io.debezium.schema.TopicSelector;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.seatunnel.connectors.cdc.base.source.offset.Offset;
+import org.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import 
org.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
+import org.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
+
+import java.util.Map;
+
+/**
+ * A subclass implementation of {@link EventDispatcher}.
+ *
+ * <pre>
+ *  1. This class shares one {@link ChangeEventQueue} between multiple readers.
+ *  2. This class override some methods for dispatching {@link HistoryRecord} 
directly,
+ *     this is useful for downstream to deserialize the {@link HistoryRecord} 
back.
+ * </pre>
+ */
+public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
+
+    private final ChangeEventQueue<DataChangeEvent> queue;
+
+    private final String topic;
+
+    public JdbcSourceEventDispatcher(
+            CommonConnectorConfig connectorConfig,
+            TopicSelector<TableId> topicSelector,
+            DatabaseSchema<TableId> schema,
+            ChangeEventQueue<DataChangeEvent> queue,
+            DataCollectionFilters.DataCollectionFilter<TableId> filter,
+            ChangeEventCreator changeEventCreator,
+            EventMetadataProvider metadataProvider,
+            SchemaNameAdjuster schemaNameAdjuster) {
+        super(
+                connectorConfig,
+                topicSelector,
+                schema,
+                queue,
+                filter,
+                changeEventCreator,
+                metadataProvider,
+                schemaNameAdjuster);
+        this.queue = queue;
+        this.topic = topicSelector.getPrimaryTopic();
+    }
+
+    public ChangeEventQueue<DataChangeEvent> getQueue() {
+        return queue;
+    }
+
+    public void dispatchWatermarkEvent(
+        Map<String, ?> sourcePartition,
+        SourceSplitBase sourceSplit,
+        Offset watermark,
+        WatermarkKind watermarkKind)
+        throws InterruptedException {
+
+        SourceRecord sourceRecord =
+            WatermarkEvent.create(
+                sourcePartition, topic, sourceSplit.splitId(), watermarkKind, 
watermark);
+        queue.enqueue(new DataChangeEvent(sourceRecord));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPoolId.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPoolId.java
new file mode 100644
index 000000000..7074c9ec7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPoolId.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The connection pool identifier. */
+public class ConnectionPoolId implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final String host;
+    private final int port;
+    private final String username;
+
+    public ConnectionPoolId(String host, int port, String username) {
+        this.host = host;
+        this.port = port;
+        this.username = username;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ConnectionPoolId)) {
+            return false;
+        }
+        ConnectionPoolId that = (ConnectionPoolId) o;
+        return Objects.equals(host, that.host)
+                && Objects.equals(port, that.port)
+                && Objects.equals(username, that.username);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(host, port, username);
+    }
+
+    @Override
+    public String toString() {
+        return username + '@' + host + ':' + port;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPools.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPools.java
new file mode 100644
index 000000000..f641ae0fc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/ConnectionPools.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+
+/** A pool collection that consists of multiple connection pools. */
+
+public interface ConnectionPools<P, C extends SourceConfig> {
+
+    /**
+     * Gets a connection pool from pools, create a new pool if the pool does 
not exists in the
+     * connection pools .
+     */
+    P getOrCreateConnectionPool(ConnectionPoolId poolId, C sourceConfig);
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionFactory.java
new file mode 100644
index 000000000..efc1e4baf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+
+import com.zaxxer.hikari.HikariDataSource;
+import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.jdbc.JdbcConnection;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/** A factory to create JDBC connection. */
+public class JdbcConnectionFactory implements JdbcConnection.ConnectionFactory 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcConnectionFactory.class);
+
+    private final JdbcSourceConfig sourceConfig;
+    private final JdbcConnectionPoolFactory jdbcConnectionPoolFactory;
+
+    public JdbcConnectionFactory(
+            JdbcSourceConfig sourceConfig, JdbcConnectionPoolFactory 
jdbcConnectionPoolFactory) {
+        this.sourceConfig = sourceConfig;
+        this.jdbcConnectionPoolFactory = jdbcConnectionPoolFactory;
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public Connection connect(JdbcConfiguration config) throws SQLException {
+        final int connectRetryTimes = sourceConfig.getConnectMaxRetries();
+
+        final ConnectionPoolId connectionPoolId =
+                new ConnectionPoolId(
+                        sourceConfig.getHostname(),
+                        sourceConfig.getPort(),
+                        sourceConfig.getUsername());
+
+        HikariDataSource dataSource =
+                JdbcConnectionPools.getInstance(jdbcConnectionPoolFactory)
+                        .getOrCreateConnectionPool(connectionPoolId, 
sourceConfig);
+
+        int i = 0;
+        while (i < connectRetryTimes) {
+            try {
+                return dataSource.getConnection();
+            } catch (SQLException e) {
+                if (i < connectRetryTimes - 1) {
+                    try {
+                        Thread.sleep(300);
+                    } catch (InterruptedException ie) {
+                        throw new SeaTunnelException(
+                                "Failed to get connection, interrupted while 
doing another attempt",
+                                ie);
+                    }
+                    LOG.warn("Get connection failed, retry times {}", i + 1);
+                } else {
+                    LOG.error("Get connection failed after retry {} times", i 
+ 1);
+                    throw new SeaTunnelException(e);
+                }
+            }
+            i++;
+        }
+        return dataSource.getConnection();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
new file mode 100644
index 000000000..6152a55b4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+
+/** A connection pool factory to create pooled DataSource {@link 
HikariDataSource}. */
+public abstract class JdbcConnectionPoolFactory {
+
+    public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
+    public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
+    public static final int MINIMUM_POOL_SIZE = 1;
+
+    public HikariDataSource createPooledDataSource(JdbcSourceConfig 
sourceConfig) {
+        final HikariConfig config = new HikariConfig();
+
+        String hostName = sourceConfig.getHostname();
+        int port = sourceConfig.getPort();
+
+        config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
+        config.setJdbcUrl(getJdbcUrl(sourceConfig));
+        config.setUsername(sourceConfig.getUsername());
+        config.setPassword(sourceConfig.getPassword());
+        config.setMinimumIdle(MINIMUM_POOL_SIZE);
+        config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
+        
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
+        config.addDataSourceProperty(SERVER_TIMEZONE_KEY, 
sourceConfig.getServerTimeZone());
+        config.setDriverClassName(sourceConfig.getDriverClassName());
+
+        // optional optimization configurations for pooled DataSource
+        config.addDataSourceProperty("cachePrepStmts", "true");
+        config.addDataSourceProperty("prepStmtCacheSize", "250");
+        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
+
+        return new HikariDataSource(config);
+    }
+
+    /**
+     * due to relational database url of the forms are different, e.g. Mysql 
<code>
+     * jdbc:mysql://<em>hostname</em>:<em>port</em></code>, Oracle Thin <code>
+     * 
jdbc:oracle:thin:@<em>hostname</em>:<em>port</em>:<em>dbName</em></code> DB2 
<code>
+     * jdbc:db2://<em>hostname</em>:<em>port</em>/<em>dbName</em></code> 
Sybase <code>
+     * jdbc:sybase:Tds:<em>hostname</em>:<em>port</em></code>, so generate a 
jdbc url by specific
+     * database.
+     *
+     * @param sourceConfig a basic Source configuration.
+     * @return a database url.
+     */
+    public abstract String getJdbcUrl(JdbcSourceConfig sourceConfig);
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPools.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPools.java
new file mode 100644
index 000000000..b3c70aad1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPools.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.relational.connection;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Jdbc Connection pools implementation. */
+public class JdbcConnectionPools implements ConnectionPools<HikariDataSource, 
JdbcSourceConfig> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcConnectionPools.class);
+
+    private static JdbcConnectionPools INSTANCE;
+    private final Map<ConnectionPoolId, HikariDataSource> pools = new 
HashMap<>();
+    private static JdbcConnectionPoolFactory JDBCCONNECTIONPOOLFACTORY;
+
+    private JdbcConnectionPools() {}
+
+    public static synchronized JdbcConnectionPools getInstance(
+            JdbcConnectionPoolFactory jdbcConnectionPoolFactory) {
+        if (INSTANCE == null) {
+            JdbcConnectionPools.JDBCCONNECTIONPOOLFACTORY = 
jdbcConnectionPoolFactory;
+            INSTANCE = new JdbcConnectionPools();
+        }
+        return INSTANCE;
+    }
+
+    @Override
+    public HikariDataSource getOrCreateConnectionPool(
+            ConnectionPoolId poolId, JdbcSourceConfig sourceConfig) {
+        synchronized (pools) {
+            if (!pools.containsKey(poolId)) {
+                LOG.info("Create and register connection pool {}", poolId);
+                pools.put(poolId, 
JDBCCONNECTIONPOOLFACTORY.createPooledDataSource(sourceConfig));
+            }
+            return pools.get(poolId);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
new file mode 100644
index 000000000..42bb1dbb5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/seatunnel/connectors/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seatunnel.connectors.cdc.base.source.reader.external;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.data.Envelope;
+import io.debezium.pipeline.ErrorHandler;
+import io.debezium.pipeline.spi.OffsetContext;
+import io.debezium.relational.RelationalDatabaseSchema;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
+import org.seatunnel.connectors.cdc.base.config.SourceConfig;
+import org.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import org.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
+import org.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The context for fetch task that fetching data of snapshot split from JDBC 
data source.
+ */
+public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
+
+    protected final JdbcSourceConfig sourceConfig;
+    protected final JdbcDataSourceDialect dataSourceDialect;
+    protected final CommonConnectorConfig dbzConnectorConfig;
+    protected final SchemaNameAdjuster schemaNameAdjuster;
+
+    public JdbcSourceFetchTaskContext(
+        JdbcSourceConfig sourceConfig, JdbcDataSourceDialect 
dataSourceDialect) {
+        this.sourceConfig = sourceConfig;
+        this.dataSourceDialect = dataSourceDialect;
+        this.dbzConnectorConfig = sourceConfig.getDbzConnectorConfig();
+        this.schemaNameAdjuster = SchemaNameAdjuster.create();
+    }
+
+    @Override
+    public TableId getTableId(SourceRecord record) {
+        return null;
+    }
+
+    @Override
+    public boolean isDataChangeRecord(SourceRecord record) {
+        return false;
+    }
+
+    @Override
+    public boolean isRecordBetween(SourceRecord record, Object[] splitStart, 
Object[] splitEnd) {
+        SeaTunnelRowType splitKeyType =
+            
getSplitType(getDatabaseSchema().tableFor(SourceRecordUtils.getTableId(record)));
+        Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record, 
getSchemaNameAdjuster());
+        return SourceRecordUtils.splitKeyRangeContains(key, splitStart, 
splitEnd);
+    }
+
+    @SuppressWarnings("checkstyle:MissingSwitchDefault")
+    @Override
+    public void rewriteOutputBuffer(
+        Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
+        Struct key = (Struct) changeRecord.key();
+        Struct value = (Struct) changeRecord.value();
+        if (value != null) {
+            Envelope.Operation operation =
+                
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
+            switch (operation) {
+                case CREATE:
+                case UPDATE:
+                    Envelope envelope = 
Envelope.fromSchema(changeRecord.valueSchema());
+                    Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+                    Struct after = value.getStruct(Envelope.FieldName.AFTER);
+                    Instant fetchTs =
+                        Instant.ofEpochMilli((Long) 
source.get(Envelope.FieldName.TIMESTAMP));
+                    SourceRecord record =
+                        new SourceRecord(
+                            changeRecord.sourcePartition(),
+                            changeRecord.sourceOffset(),
+                            changeRecord.topic(),
+                            changeRecord.kafkaPartition(),
+                            changeRecord.keySchema(),
+                            changeRecord.key(),
+                            changeRecord.valueSchema(),
+                            envelope.read(after, source, fetchTs));
+                    outputBuffer.put(key, record);
+                    break;
+                case DELETE:
+                    outputBuffer.remove(key);
+                    break;
+                case READ:
+                    throw new IllegalStateException(
+                        String.format(
+                            "Data change record shouldn't use READ operation, 
the the record is %s.",
+                            changeRecord));
+            }
+        }
+    }
+
+    @Override
+    public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> 
snapshotRecords) {
+        return snapshotRecords.stream()
+            .map(
+                record -> {
+                    Envelope envelope = 
Envelope.fromSchema(record.valueSchema());
+                    Struct value = (Struct) record.value();
+                    Struct updateAfter = 
value.getStruct(Envelope.FieldName.AFTER);
+                    // set message timestamp (source.ts_ms) to 0L
+                    Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+                    source.put(Envelope.FieldName.TIMESTAMP, 0L);
+                    // extend the fetch timestamp(ts_ms)
+                    Instant fetchTs =
+                        Instant.ofEpochMilli(
+                            value.getInt64(Envelope.FieldName.TIMESTAMP));
+                    SourceRecord sourceRecord =
+                        new SourceRecord(
+                            record.sourcePartition(),
+                            record.sourceOffset(),
+                            record.topic(),
+                            record.kafkaPartition(),
+                            record.keySchema(),
+                            record.key(),
+                            record.valueSchema(),
+                            envelope.read(updateAfter, source, fetchTs));
+                    return sourceRecord;
+                })
+            .collect(Collectors.toList());
+    }
+
+    public SourceConfig getSourceConfig() {
+        return sourceConfig;
+    }
+
+    public JdbcDataSourceDialect getDataSourceDialect() {
+        return dataSourceDialect;
+    }
+
+    public CommonConnectorConfig getDbzConnectorConfig() {
+        return dbzConnectorConfig;
+    }
+
+    public SchemaNameAdjuster getSchemaNameAdjuster() {
+        return null;
+    }
+
+    public abstract RelationalDatabaseSchema getDatabaseSchema();
+
+    public abstract SeaTunnelRowType getSplitType(Table table);
+
+    public abstract ErrorHandler getErrorHandler();
+
+    public abstract JdbcSourceEventDispatcher getDispatcher();
+
+    public abstract OffsetContext getOffsetContext();
+}


Reply via email to