This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f559ea2c [Feature](cdc) add DB2 database sync (#316)
f559ea2c is described below

commit f559ea2cda6d367260b73a209898aa4af08bf6de
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Fri Jul 26 10:44:26 2024 +0800

    [Feature](cdc) add DB2 database sync (#316)
---
 flink-doris-connector/pom.xml                      |  12 ++
 .../jsondebezium/JsonDebeziumChangeUtils.java      |   4 +
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  13 ++
 .../doris/flink/tools/cdc/DatabaseSyncConfig.java  |   2 +
 .../doris/flink/tools/cdc/SourceConnector.java     |   3 +-
 .../doris/flink/tools/cdc/db2/Db2DatabaseSync.java | 227 +++++++++++++++++++++
 .../flink/tools/cdc/db2/Db2DateConverter.java      | 133 ++++++++++++
 .../{SourceConnector.java => db2/Db2Schema.java}   |  32 +--
 .../apache/doris/flink/tools/cdc/db2/Db2Type.java  |  94 +++++++++
 .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java    | 104 ++++++++++
 .../doris/flink/tools/cdc/db2/Db2TypeTest.java     |  49 +++++
 11 files changed, 660 insertions(+), 13 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index b6e90bea..2d7b2875 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -286,6 +286,18 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-connector-db2-cdc</artifactId>
+            <version>${flink.sql.cdc.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>flink-shaded-guava</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
index 571bacfb..492a7d29 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.db2.Db2Type;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
 import org.apache.doris.flink.tools.cdc.oracle.OracleType;
 import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
@@ -84,6 +85,9 @@ public class JsonDebeziumChangeUtils {
             case SQLSERVER:
                 dorisTypeName = SqlServerType.toDorisType(dataType, length, 
scale);
                 break;
+            case DB2:
+                dorisTypeName = Db2Type.toDorisType(dataType, length, scale);
+                break;
             default:
                 String errMsg = sourceConnector + " not support " + dataType + 
" schema change.";
                 throw new UnsupportedOperationException(errMsg);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 3ab38a19..b62f0f52 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
 import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
 import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
@@ -61,6 +62,9 @@ public class CdcTools {
             case DatabaseSyncConfig.MONGODB_SYNC_DATABASE:
                 createMongoDBSyncDatabase(opArgs);
                 break;
+            case DatabaseSyncConfig.DB2_SYNC_DATABASE:
+                createDb2SyncDatabase(opArgs);
+                break;
             default:
                 System.out.println("Unknown operation " + operation);
                 System.exit(1);
@@ -112,6 +116,15 @@ public class CdcTools {
         syncDatabase(params, databaseSync, mongoConfig, 
SourceConnector.MONGODB);
     }
 
+    private static void createDb2SyncDatabase(String[] opArgs) throws 
Exception {
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+        Preconditions.checkArgument(params.has(DatabaseSyncConfig.DB2_CONF));
+        Map<String, String> db2Map = getConfigMap(params, 
DatabaseSyncConfig.DB2_CONF);
+        Configuration db2Config = Configuration.fromMap(db2Map);
+        DatabaseSync databaseSync = new Db2DatabaseSync();
+        syncDatabase(params, databaseSync, db2Config, SourceConnector.DB2);
+    }
+
     private static void syncDatabase(
             MultipleParameterTool params,
             DatabaseSync databaseSync,
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
index 62b77645..6c78d5cd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java
@@ -24,12 +24,14 @@ public class DatabaseSyncConfig {
     public static final String POSTGRES_SYNC_DATABASE = 
"postgres-sync-database";
     public static final String SQLSERVER_SYNC_DATABASE = 
"sqlserver-sync-database";
     public static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database";
+    public static final String DB2_SYNC_DATABASE = "db2-sync-database";
 
     public static final String MYSQL_CONF = "mysql-conf";
     public static final String ORACLE_CONF = "oracle-conf";
     public static final String POSTGRES_CONF = "postgres-conf";
     public static final String SQLSERVER_CONF = "sqlserver-conf";
     public static final String MONGODB_CONF = "mongodb-conf";
+    public static final String DB2_CONF = "db2-conf";
 
     ///////////// source-conf ////////
     public static final String DATABASE_NAME = "database-name";
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
index 27e9600e..47c8dfba 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
@@ -22,7 +22,8 @@ public enum SourceConnector {
     ORACLE("oracle"),
     POSTGRES("postgres"),
     SQLSERVER("sqlserver"),
-    MONGODB("mongodb");
+    MONGODB("mongodb"),
+    DB2("db2");
 
     public final String connectorName;
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
new file mode 100644
index 00000000..2dcd21a9
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
+import org.apache.flink.cdc.connectors.db2.Db2Source;
+import org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.tools.cdc.DatabaseSync;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import 
org.apache.doris.flink.tools.cdc.deserialize.DorisJsonDebeziumDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE;
+import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES;
+import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
+import static 
org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
+import static 
org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
+
+public class Db2DatabaseSync extends DatabaseSync {
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(50000)
+                    .withDescription("Integer port number of the DB2 database 
server.");
+    private static final Logger LOG = 
LoggerFactory.getLogger(Db2DatabaseSync.class);
+
+    private static final String JDBC_URL = "jdbc:db2://%s:%d/%s";
+
+    public Db2DatabaseSync() throws SQLException {
+        super();
+    }
+
+    @Override
+    public void registerDriver() throws SQLException {
+        try {
+            Class.forName("com.ibm.db2.jcc.DB2Driver");
+            LOG.info(" Loaded the JDBC driver");
+        } catch (ClassNotFoundException ex) {
+            throw new SQLException(
+                    "No suitable driver found, can not found class 
com.ibm.db2.jcc.DB2Driver");
+        }
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        String jdbcUrl =
+                String.format(
+                        JDBC_URL,
+                        config.get(JdbcSourceOptions.HOSTNAME),
+                        config.get(PORT),
+                        config.get(JdbcSourceOptions.DATABASE_NAME));
+        Properties pro = new Properties();
+        pro.setProperty("user", config.get(JdbcSourceOptions.USERNAME));
+        pro.setProperty("password", config.get(JdbcSourceOptions.PASSWORD));
+        return DriverManager.getConnection(jdbcUrl, pro);
+    }
+
+    @Override
+    public List<SourceSchema> getSchemaList() throws Exception {
+        String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+        List<SourceSchema> schemaList = new ArrayList<>();
+        LOG.info("database-name {}, schema-name {}", databaseName, schemaName);
+        try (Connection conn = getConnection()) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            try (ResultSet tables =
+                    metaData.getTables(null, schemaName, "%", new String[] 
{"TABLE"})) {
+                while (tables.next()) {
+                    String tableName = tables.getString("TABLE_NAME");
+                    String tableComment = tables.getString("REMARKS");
+                    if (!isSyncNeeded(tableName)) {
+                        continue;
+                    }
+                    SourceSchema sourceSchema =
+                            new Db2Schema(
+                                    metaData, databaseName, schemaName, 
tableName, tableComment);
+                    sourceSchema.setModel(
+                            !sourceSchema.primaryKeys.isEmpty()
+                                    ? DataModel.UNIQUE
+                                    : DataModel.DUPLICATE);
+                    schemaList.add(sourceSchema);
+                }
+            }
+        }
+        return schemaList;
+    }
+
+    @Override
+    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment 
env) {
+        String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME);
+        String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+        Preconditions.checkNotNull(databaseName, "database-name in DB2 is 
required");
+        Preconditions.checkNotNull(schemaName, "schema-name in DB2 is 
required");
+
+        String tableName = config.get(JdbcSourceOptions.TABLE_NAME);
+        String hostname = config.get(JdbcSourceOptions.HOSTNAME);
+        Integer port = config.get(PORT);
+        String username = config.get(JdbcSourceOptions.USERNAME);
+        String password = config.get(JdbcSourceOptions.PASSWORD);
+
+        StartupOptions startupOptions = StartupOptions.initial();
+        String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
+        if ("initial".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.initial();
+        } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+            startupOptions = StartupOptions.latest();
+        }
+
+        // debezium properties set
+        Properties debeziumProperties = new Properties();
+        debeziumProperties.putAll(Db2DateConverter.DEFAULT_PROPS);
+        debeziumProperties.put("decimal.handling.mode", "string");
+
+        for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+                debeziumProperties.put(
+                        
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+            }
+        }
+
+        DebeziumDeserializationSchema<String> schema;
+        if (ignoreDefaultValue) {
+            schema = new DorisJsonDebeziumDeserializationSchema();
+        } else {
+            Map<String, Object> customConverterConfigs = new HashMap<>();
+            schema = new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        }
+
+        if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
true)) {
+            JdbcIncrementalSource<String> db2IncrementalSource =
+                    Db2SourceBuilder.Db2IncrementalSource.<String>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .databaseList(databaseName)
+                            .tableList(tableName)
+                            .username(username)
+                            .password(password)
+                            .deserializer(schema)
+                            .debeziumProperties(debeziumProperties)
+                            .startupOptions(startupOptions)
+                            .includeSchemaChanges(true)
+                            .debeziumProperties(debeziumProperties)
+                            .serverTimeZone(config.get(SERVER_TIME_ZONE))
+                            
.splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE))
+                            
.splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE))
+                            .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE))
+                            .connectTimeout(config.get(CONNECT_TIMEOUT))
+                            
.connectionPoolSize(config.get(CONNECTION_POOL_SIZE))
+                            .connectMaxRetries(config.get(CONNECT_MAX_RETRIES))
+                            .distributionFactorUpper(
+                                    
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND))
+                            .distributionFactorLower(
+                                    
config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND))
+                            .build();
+            return env.fromSource(
+                    db2IncrementalSource, WatermarkStrategy.noWatermarks(), 
"Db2IncrementalSource");
+
+        } else {
+            DebeziumSourceFunction<String> db2Source =
+                    Db2Source.<String>builder()
+                            .hostname(hostname)
+                            .port(port)
+                            .database(databaseName)
+                            .tableList(tableName)
+                            .username(username)
+                            .password(password)
+                            .debeziumProperties(debeziumProperties)
+                            .startupOptions(startupOptions)
+                            .deserializer(schema)
+                            .build();
+            return env.addSource(db2Source, "Db2 Source");
+        }
+    }
+
+    @Override
+    public String getTableListPrefix() {
+        return config.get(JdbcSourceOptions.SCHEMA_NAME);
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java
new file mode 100644
index 00000000..9d681d8a
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import 
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class Db2DateConverter implements CustomConverter<SchemaBuilder, 
RelationalColumn> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Db2DateConverter.class);
+    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+    private DateTimeFormatter timestampFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+    private final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+
+    protected static final Properties DEFAULT_PROPS = new Properties();
+
+    static {
+        DEFAULT_PROPS.setProperty(DatabaseSyncConfig.CONVERTERS, 
DatabaseSyncConfig.DATE);
+        DEFAULT_PROPS.setProperty(
+                DatabaseSyncConfig.DATE_TYPE,
+                "org.apache.doris.flink.tools.cdc.db2.Db2DateConverter");
+        DEFAULT_PROPS.setProperty(
+                DatabaseSyncConfig.DATE_FORMAT_DATE, 
DatabaseSyncConfig.YEAR_MONTH_DAY_FORMAT);
+        DEFAULT_PROPS.setProperty(
+                DatabaseSyncConfig.DATE_FORMAT_TIMESTAMP, 
DatabaseSyncConfig.DATETIME_MICRO_FORMAT);
+    }
+
+    @Override
+    public void configure(Properties props) {
+        readProps(props, "format.date", p -> dateFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(
+                props,
+                "format.timestamp",
+                p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
+    }
+
+    private void readProps(Properties properties, String settingKey, 
Consumer<String> callback) {
+        String settingValue = (String) properties.get(settingKey);
+        if (settingValue == null || settingValue.isEmpty()) {
+            return;
+        }
+        try {
+            callback.accept(settingValue.trim());
+        } catch (IllegalArgumentException | DateTimeException e) {
+            LOGGER.error("setting {} is illegal:{}", settingKey, settingValue);
+            throw e;
+        }
+    }
+
+    @Override
+    public void converterFor(
+            RelationalColumn column,
+            CustomConverter.ConverterRegistration<SchemaBuilder> registration) 
{
+        String sqlType = column.typeName().toUpperCase();
+        SchemaBuilder schemaBuilder = null;
+        CustomConverter.Converter converter = null;
+        if (DatabaseSyncConfig.UPPERCASE_DATE.equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertDate;
+        }
+        if (DatabaseSyncConfig.TIME.equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertTime;
+        }
+        if (DatabaseSyncConfig.TIMESTAMP.equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertTimestamp;
+        }
+        if (schemaBuilder != null) {
+            registration.register(schemaBuilder, converter);
+        }
+    }
+
+    private String convertDate(Object input) {
+        if (input instanceof LocalDate) {
+            return dateFormatter.format((LocalDate) input);
+        } else if (input instanceof Integer) {
+            LocalDate date = LocalDate.ofEpochDay((Integer) input);
+            return dateFormatter.format(date);
+        } else if (input instanceof Date) {
+            return dateFormatter.format(((Date) input).toLocalDate());
+        }
+        return null;
+    }
+
+    private String convertTime(Object input) {
+        if (input instanceof Time) {
+            return timeFormatter.format(((Time) input).toLocalTime());
+        }
+        return null;
+    }
+
+    private String convertTimestamp(Object input) {
+        if (input instanceof Timestamp) {
+            return timestampFormatter.format(((Timestamp) 
input).toLocalDateTime());
+        } else if (input instanceof Instant) {
+            LocalDateTime ldt = LocalDateTime.ofInstant(((Instant) input), 
ZoneOffset.UTC);
+            return timestampFormatter.format(ldt);
+        }
+        return null;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
similarity index 51%
copy from 
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
copy to 
flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
index 27e9600e..5aaf8cea 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
@@ -15,22 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.flink.tools.cdc;
+package org.apache.doris.flink.tools.cdc.db2;
 
-public enum SourceConnector {
-    MYSQL("mysql"),
-    ORACLE("oracle"),
-    POSTGRES("postgres"),
-    SQLSERVER("sqlserver"),
-    MONGODB("mongodb");
+import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
-    public final String connectorName;
+import java.sql.DatabaseMetaData;
 
-    SourceConnector(String connectorName) {
-        this.connectorName = connectorName;
+public class Db2Schema extends JdbcSourceSchema {
+    public Db2Schema(
+            DatabaseMetaData metaData,
+            String databaseName,
+            String schemaName,
+            String tableName,
+            String tableComment)
+            throws Exception {
+        super(metaData, databaseName, schemaName, tableName, tableComment);
     }
 
-    public String getConnectorName() {
-        return connectorName;
+    @Override
+    public String convertToDorisType(String fieldType, Integer precision, 
Integer scale) {
+        return Db2Type.toDorisType(fieldType, precision, scale);
+    }
+
+    @Override
+    public String getCdcTableName() {
+        return schemaName + "\\." + tableName;
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java
new file mode 100644
index 00000000..1255d1e7
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+
+public class Db2Type {
+    private static final String BOOLEAN = "BOOLEAN";
+    private static final String SMALLINT = "SMALLINT";
+    private static final String INTEGER = "INTEGER";
+    private static final String INT = "INT";
+    private static final String BIGINT = "BIGINT";
+    private static final String REAL = "REAL";
+    private static final String DECFLOAT = "DECFLOAT";
+    private static final String DOUBLE = "DOUBLE";
+    private static final String DECIMAL = "DECIMAL";
+    private static final String NUMERIC = "NUMERIC";
+    private static final String DATE = "DATE";
+    private static final String TIME = "TIME";
+    private static final String TIMESTAMP = "TIMESTAMP";
+    private static final String CHARACTER = "CHARACTER";
+    private static final String CHAR = "CHAR";
+    private static final String LONG_VARCHAR = "LONG VARCHAR";
+    private static final String VARCHAR = "VARCHAR";
+    private static final String XML = "XML";
+    private static final String VARGRAPHIC = "VARGRAPHIC";
+
+    public static String toDorisType(String db2Type, Integer precision, 
Integer scale) {
+        db2Type = db2Type.toUpperCase();
+        switch (db2Type) {
+            case BOOLEAN:
+                return DorisType.BOOLEAN;
+            case SMALLINT:
+                return DorisType.SMALLINT;
+            case INTEGER:
+            case INT:
+                return DorisType.INT;
+            case BIGINT:
+                return DorisType.BIGINT;
+            case REAL:
+                return DorisType.FLOAT;
+            case DOUBLE:
+                return DorisType.DOUBLE;
+            case DATE:
+                return DorisType.DATE_V2;
+            case DECFLOAT:
+            case DECIMAL:
+            case NUMERIC:
+                if (precision != null && precision > 0 && precision <= 38) {
+                    if (scale != null && scale >= 0) {
+                        return String.format("%s(%s,%s)", 
DorisType.DECIMAL_V3, precision, scale);
+                    }
+                    return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, 
precision, 0);
+                } else {
+                    return DorisType.STRING;
+                }
+            case CHARACTER:
+            case CHAR:
+            case VARCHAR:
+            case LONG_VARCHAR:
+                Preconditions.checkNotNull(precision);
+                return precision * 3 > 65533
+                        ? DorisType.STRING
+                        : String.format("%s(%s)", DorisType.VARCHAR, precision 
* 3);
+            case TIMESTAMP:
+                return String.format(
+                        "%s(%s)", DorisType.DATETIME_V2, Math.min(scale == 
null ? 0 : scale, 6));
+            case TIME:
+            case VARGRAPHIC:
+                // Currently, the Flink CDC connector does not support the XML 
data type from DB2.
+                // Case XML:
+                return DorisType.STRING;
+            default:
+                throw new UnsupportedOperationException("Unsupported DB2 Type: 
" + db2Type);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
new file mode 100644
index 00000000..77b8931d
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcDb2SyncDatabaseCase {
+
+    public static void main(String[] args) throws Exception {
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.disableOperatorChaining();
+        env.enableCheckpointing(10000);
+
+        //  Map<String,String> flinkMap = new HashMap<>();
+        //  flinkMap.put("execution.checkpointing.interval","10s");
+        //  flinkMap.put("pipeline.operator-chaining","false");
+        //  flinkMap.put("parallelism.default","1");
+
+        //  Configuration configuration = Configuration.fromMap(flinkMap);
+        //  env.configure(configuration);
+
+        String database = "db2_test";
+        String tablePrefix = "";
+        String tableSuffix = "";
+        Map<String, String> sourceConfig = new HashMap<>();
+        sourceConfig.put("database-name", "testdb");
+        sourceConfig.put("schema-name", "DB2INST1");
+        sourceConfig.put("hostname", "127.0.0.1");
+        sourceConfig.put("port", "50000");
+        sourceConfig.put("username", "db2inst1");
+        sourceConfig.put("password", "=doris123456");
+        // 
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
+        sourceConfig.put("scan.incremental.snapshot.enabled", "true");
+        // sourceConfig.put("debezium.include.schema.changes","false");
+
+        Configuration config = Configuration.fromMap(sourceConfig);
+
+        Map<String, String> sinkConfig = new HashMap<>();
+        sinkConfig.put("fenodes", "127.0.0.1:8030");
+        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040");
+        sinkConfig.put("username", "root");
+        sinkConfig.put("password", "123456");
+        sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        Configuration sinkConf = Configuration.fromMap(sinkConfig);
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("replication_num", "1");
+        //        tableConfig.put("table-buckets", 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        String includingTables = "FULL_TYPES";
+        String excludingTables = null;
+        String multiToOneOrigin = null;
+        String multiToOneTarget = null;
+        boolean ignoreDefaultValue = false;
+        boolean useNewSchemaChange = true;
+        boolean singleSink = false;
+        boolean ignoreIncompatible = false;
+        DatabaseSync databaseSync = new Db2DatabaseSync();
+        databaseSync
+                .setEnv(env)
+                .setDatabase(database)
+                .setConfig(config)
+                .setTablePrefix(tablePrefix)
+                .setTableSuffix(tableSuffix)
+                .setIncludingTables(includingTables)
+                .setExcludingTables(excludingTables)
+                .setMultiToOneOrigin(multiToOneOrigin)
+                .setMultiToOneTarget(multiToOneTarget)
+                .setIgnoreDefaultValue(ignoreDefaultValue)
+                .setSinkConfig(sinkConf)
+                .setTableConfig(tableConfig)
+                .setCreateTableOnly(false)
+                .setNewSchemaChange(useNewSchemaChange)
+                .setSingleSink(singleSink)
+                .setIgnoreIncompatible(ignoreIncompatible)
+                .create();
+        databaseSync.build();
+        env.execute(String.format("DB2-Doris Database Sync: %s", database));
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java
new file mode 100644
index 00000000..22656902
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.db2;
+
+import org.apache.doris.flink.catalog.doris.DorisType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class Db2TypeTest {
+    @Test
+    public void db2FullTypeTest() {
+        assertEquals(DorisType.BOOLEAN, Db2Type.toDorisType("BOOLEAN", 1, 
null));
+        assertEquals(DorisType.SMALLINT, Db2Type.toDorisType("SMALLINT", 5, 
0));
+        assertEquals(DorisType.INT, Db2Type.toDorisType("INTEGER", 10, 0));
+        assertEquals(DorisType.BIGINT, Db2Type.toDorisType("BIGINT", 10, 0));
+        assertEquals(DorisType.FLOAT, Db2Type.toDorisType("REAL", 24, null));
+        assertEquals(DorisType.DOUBLE, Db2Type.toDorisType("DOUBLE", 53, 
null));
+        assertEquals("DECIMALV3(34,0)", Db2Type.toDorisType("DECFLOAT", 34, 
null));
+        assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("DECIMAL", 31, 0));
+        assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("DECIMAL", 31, 
31));
+        assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("NUMERIC", 31, 0));
+        assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("NUMERIC", 31, 
31));
+        assertEquals("VARCHAR(600)", Db2Type.toDorisType("VARCHAR", 200, 
null));
+        assertEquals(DorisType.STRING, Db2Type.toDorisType("VARCHAR", 32672, 
null));
+        assertEquals(DorisType.VARCHAR + "(3)", Db2Type.toDorisType("CHAR", 1, 
null));
+        assertEquals(DorisType.VARCHAR + "(765)", Db2Type.toDorisType("CHAR", 
255, null));
+        assertEquals(DorisType.DATETIME_V2 + "(0)", 
Db2Type.toDorisType("TIMESTAMP", 26, 0));
+        assertEquals(DorisType.DATETIME_V2 + "(6)", 
Db2Type.toDorisType("TIMESTAMP", 26, 6));
+        assertEquals(DorisType.DATETIME_V2 + "(6)", 
Db2Type.toDorisType("TIMESTAMP", 26, 9));
+        assertEquals(DorisType.DATE_V2, Db2Type.toDorisType("DATE", 10, null));
+        assertEquals(DorisType.STRING, Db2Type.toDorisType("TIME", 8, 0));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to