This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new f559ea2c [Feature](cdc) add DB2 database sync (#316) f559ea2c is described below commit f559ea2cda6d367260b73a209898aa4af08bf6de Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Fri Jul 26 10:44:26 2024 +0800 [Feature](cdc) add DB2 database sync (#316) --- flink-doris-connector/pom.xml | 12 ++ .../jsondebezium/JsonDebeziumChangeUtils.java | 4 + .../org/apache/doris/flink/tools/cdc/CdcTools.java | 13 ++ .../doris/flink/tools/cdc/DatabaseSyncConfig.java | 2 + .../doris/flink/tools/cdc/SourceConnector.java | 3 +- .../doris/flink/tools/cdc/db2/Db2DatabaseSync.java | 227 +++++++++++++++++++++ .../flink/tools/cdc/db2/Db2DateConverter.java | 133 ++++++++++++ .../{SourceConnector.java => db2/Db2Schema.java} | 32 +-- .../apache/doris/flink/tools/cdc/db2/Db2Type.java | 94 +++++++++ .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 104 ++++++++++ .../doris/flink/tools/cdc/db2/Db2TypeTest.java | 49 +++++ 11 files changed, 660 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b6e90bea..2d7b2875 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -286,6 +286,18 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-connector-db2-cdc</artifactId> + <version>${flink.sql.cdc.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>flink-shaded-guava</artifactId> + <groupId>org.apache.flink</groupId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-mongodb-cdc</artifactId> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java index 571bacfb..492a7d29 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeUtils.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.node.NullNode; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.db2.Db2Type; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; import org.apache.doris.flink.tools.cdc.oracle.OracleType; import org.apache.doris.flink.tools.cdc.postgres.PostgresType; @@ -84,6 +85,9 @@ public class JsonDebeziumChangeUtils { case SQLSERVER: dorisTypeName = SqlServerType.toDorisType(dataType, length, scale); break; + case DB2: + dorisTypeName = Db2Type.toDorisType(dataType, length, scale); + break; default: String errMsg = sourceConnector + " not support " + dataType + " schema change."; throw new UnsupportedOperationException(errMsg); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 3ab38a19..b62f0f52 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; @@ -61,6 +62,9 @@ public class CdcTools { case DatabaseSyncConfig.MONGODB_SYNC_DATABASE: createMongoDBSyncDatabase(opArgs); break; + case DatabaseSyncConfig.DB2_SYNC_DATABASE: + createDb2SyncDatabase(opArgs); + break; default: System.out.println("Unknown operation " + operation); System.exit(1); @@ -112,6 +116,15 @@ public class CdcTools { syncDatabase(params, databaseSync, mongoConfig, SourceConnector.MONGODB); } + private static void createDb2SyncDatabase(String[] opArgs) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has(DatabaseSyncConfig.DB2_CONF)); + Map<String, String> db2Map = getConfigMap(params, DatabaseSyncConfig.DB2_CONF); + Configuration db2Config = Configuration.fromMap(db2Map); + DatabaseSync databaseSync = new Db2DatabaseSync(); + syncDatabase(params, databaseSync, db2Config, SourceConnector.DB2); + } + private static void syncDatabase( MultipleParameterTool params, DatabaseSync databaseSync, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java index 62b77645..6c78d5cd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSyncConfig.java @@ -24,12 +24,14 @@ public class DatabaseSyncConfig { public static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database"; public static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database"; public static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database"; + public static final String DB2_SYNC_DATABASE = "db2-sync-database"; public static final String MYSQL_CONF = "mysql-conf"; public static final String ORACLE_CONF = "oracle-conf"; public static final String POSTGRES_CONF = "postgres-conf"; public static final String SQLSERVER_CONF = "sqlserver-conf"; public static final String MONGODB_CONF = "mongodb-conf"; + public static final String DB2_CONF = "db2-conf"; ///////////// source-conf //////// public static final String DATABASE_NAME = "database-name"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java index 27e9600e..47c8dfba 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java @@ -22,7 +22,8 @@ public enum SourceConnector { ORACLE("oracle"), POSTGRES("postgres"), SQLSERVER("sqlserver"), - MONGODB("mongodb"); + MONGODB("mongodb"), + DB2("db2"); public final String connectorName; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java new file mode 100644 index 00000000..2dcd21a9 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DatabaseSync.java @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.db2; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.db2.Db2Source; +import org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.DebeziumSourceFunction; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumOptions; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; + +import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.tools.cdc.DatabaseSync; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.deserialize.DorisJsonDebeziumDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; + +public class Db2DatabaseSync extends DatabaseSync { + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(50000) + .withDescription("Integer port number of the DB2 database server."); + private static final Logger LOG = LoggerFactory.getLogger(Db2DatabaseSync.class); + + private static final String JDBC_URL = "jdbc:db2://%s:%d/%s"; + + public Db2DatabaseSync() throws SQLException { + super(); + } + + @Override + public void registerDriver() throws SQLException { + try { + Class.forName("com.ibm.db2.jcc.DB2Driver"); + LOG.info(" Loaded the JDBC driver"); + } catch (ClassNotFoundException ex) { + throw new SQLException( + "No suitable driver found, can not found class com.ibm.db2.jcc.DB2Driver"); + } + } + + @Override + public Connection getConnection() throws SQLException { + String jdbcUrl = + String.format( + JDBC_URL, + config.get(JdbcSourceOptions.HOSTNAME), + config.get(PORT), + config.get(JdbcSourceOptions.DATABASE_NAME)); + Properties pro = new Properties(); + pro.setProperty("user", config.get(JdbcSourceOptions.USERNAME)); + pro.setProperty("password", config.get(JdbcSourceOptions.PASSWORD)); + return DriverManager.getConnection(jdbcUrl, pro); + } + + @Override + public List<SourceSchema> getSchemaList() throws Exception { + String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); + String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); + List<SourceSchema> schemaList = new ArrayList<>(); + LOG.info("database-name {}, schema-name {}", databaseName, schemaName); + try (Connection conn = getConnection()) { + DatabaseMetaData metaData = conn.getMetaData(); + try (ResultSet tables = + metaData.getTables(null, schemaName, "%", new String[] {"TABLE"})) { + while (tables.next()) { + String tableName = tables.getString("TABLE_NAME"); + String tableComment = tables.getString("REMARKS"); + if (!isSyncNeeded(tableName)) { + continue; + } + SourceSchema sourceSchema = + new Db2Schema( + metaData, databaseName, schemaName, tableName, tableComment); + sourceSchema.setModel( + !sourceSchema.primaryKeys.isEmpty() + ? DataModel.UNIQUE + : DataModel.DUPLICATE); + schemaList.add(sourceSchema); + } + } + } + return schemaList; + } + + @Override + public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) { + String databaseName = config.get(JdbcSourceOptions.DATABASE_NAME); + String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME); + Preconditions.checkNotNull(databaseName, "database-name in DB2 is required"); + Preconditions.checkNotNull(schemaName, "schema-name in DB2 is required"); + + String tableName = config.get(JdbcSourceOptions.TABLE_NAME); + String hostname = config.get(JdbcSourceOptions.HOSTNAME); + Integer port = config.get(PORT); + String username = config.get(JdbcSourceOptions.USERNAME); + String password = config.get(JdbcSourceOptions.PASSWORD); + + StartupOptions startupOptions = StartupOptions.initial(); + String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE); + if ("initial".equalsIgnoreCase(startupMode)) { + startupOptions = StartupOptions.initial(); + } else if ("latest-offset".equalsIgnoreCase(startupMode)) { + startupOptions = StartupOptions.latest(); + } + + // debezium properties set + Properties debeziumProperties = new Properties(); + debeziumProperties.putAll(Db2DateConverter.DEFAULT_PROPS); + debeziumProperties.put("decimal.handling.mode", "string"); + + for (Map.Entry<String, String> entry : config.toMap().entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) { + debeziumProperties.put( + key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value); + } + } + + DebeziumDeserializationSchema<String> schema; + if (ignoreDefaultValue) { + schema = new DorisJsonDebeziumDeserializationSchema(); + } else { + Map<String, Object> customConverterConfigs = new HashMap<>(); + schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + } + + if (config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, true)) { + JdbcIncrementalSource<String> db2IncrementalSource = + Db2SourceBuilder.Db2IncrementalSource.<String>builder() + .hostname(hostname) + .port(port) + .databaseList(databaseName) + .tableList(tableName) + .username(username) + .password(password) + .deserializer(schema) + .debeziumProperties(debeziumProperties) + .startupOptions(startupOptions) + .includeSchemaChanges(true) + .debeziumProperties(debeziumProperties) + .serverTimeZone(config.get(SERVER_TIME_ZONE)) + .splitSize(config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)) + .splitMetaGroupSize(config.get(CHUNK_META_GROUP_SIZE)) + .fetchSize(config.get(SCAN_SNAPSHOT_FETCH_SIZE)) + .connectTimeout(config.get(CONNECT_TIMEOUT)) + .connectionPoolSize(config.get(CONNECTION_POOL_SIZE)) + .connectMaxRetries(config.get(CONNECT_MAX_RETRIES)) + .distributionFactorUpper( + config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)) + .distributionFactorLower( + config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND)) + .build(); + return env.fromSource( + db2IncrementalSource, WatermarkStrategy.noWatermarks(), "Db2IncrementalSource"); + + } else { + DebeziumSourceFunction<String> db2Source = + Db2Source.<String>builder() + .hostname(hostname) + .port(port) + .database(databaseName) + .tableList(tableName) + .username(username) + .password(password) + .debeziumProperties(debeziumProperties) + .startupOptions(startupOptions) + .deserializer(schema) + .build(); + return env.addSource(db2Source, "Db2 Source"); + } + } + + @Override + public String getTableListPrefix() { + return config.get(JdbcSourceOptions.SCHEMA_NAME); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java new file mode 100644 index 00000000..9d681d8a --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2DateConverter.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.db2; + +import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.DateTimeException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.function.Consumer; + +public class Db2DateConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { + private static final Logger LOGGER = LoggerFactory.getLogger(Db2DateConverter.class); + private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; + private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; + private final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; + + protected static final Properties DEFAULT_PROPS = new Properties(); + + static { + DEFAULT_PROPS.setProperty(DatabaseSyncConfig.CONVERTERS, DatabaseSyncConfig.DATE); + DEFAULT_PROPS.setProperty( + DatabaseSyncConfig.DATE_TYPE, + "org.apache.doris.flink.tools.cdc.db2.Db2DateConverter"); + DEFAULT_PROPS.setProperty( + DatabaseSyncConfig.DATE_FORMAT_DATE, DatabaseSyncConfig.YEAR_MONTH_DAY_FORMAT); + DEFAULT_PROPS.setProperty( + DatabaseSyncConfig.DATE_FORMAT_TIMESTAMP, DatabaseSyncConfig.DATETIME_MICRO_FORMAT); + } + + @Override + public void configure(Properties props) { + readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); + readProps( + props, + "format.timestamp", + p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); + } + + private void readProps(Properties properties, String settingKey, Consumer<String> callback) { + String settingValue = (String) properties.get(settingKey); + if (settingValue == null || settingValue.isEmpty()) { + return; + } + try { + callback.accept(settingValue.trim()); + } catch (IllegalArgumentException | DateTimeException e) { + LOGGER.error("setting {} is illegal:{}", settingKey, settingValue); + throw e; + } + } + + @Override + public void converterFor( + RelationalColumn column, + CustomConverter.ConverterRegistration<SchemaBuilder> registration) { + String sqlType = column.typeName().toUpperCase(); + SchemaBuilder schemaBuilder = null; + CustomConverter.Converter converter = null; + if (DatabaseSyncConfig.UPPERCASE_DATE.equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDate; + } + if (DatabaseSyncConfig.TIME.equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertTime; + } + if (DatabaseSyncConfig.TIMESTAMP.equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertTimestamp; + } + if (schemaBuilder != null) { + registration.register(schemaBuilder, converter); + } + } + + private String convertDate(Object input) { + if (input instanceof LocalDate) { + return dateFormatter.format((LocalDate) input); + } else if (input instanceof Integer) { + LocalDate date = LocalDate.ofEpochDay((Integer) input); + return dateFormatter.format(date); + } else if (input instanceof Date) { + return dateFormatter.format(((Date) input).toLocalDate()); + } + return null; + } + + private String convertTime(Object input) { + if (input instanceof Time) { + return timeFormatter.format(((Time) input).toLocalTime()); + } + return null; + } + + private String convertTimestamp(Object input) { + if (input instanceof Timestamp) { + return timestampFormatter.format(((Timestamp) input).toLocalDateTime()); + } else if (input instanceof Instant) { + LocalDateTime ldt = LocalDateTime.ofInstant(((Instant) input), ZoneOffset.UTC); + return timestampFormatter.format(ldt); + } + return null; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java similarity index 51% copy from flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java copy to flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java index 27e9600e..5aaf8cea 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java @@ -15,22 +15,30 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.tools.cdc; +package org.apache.doris.flink.tools.cdc.db2; -public enum SourceConnector { - MYSQL("mysql"), - ORACLE("oracle"), - POSTGRES("postgres"), - SQLSERVER("sqlserver"), - MONGODB("mongodb"); +import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; - public final String connectorName; +import java.sql.DatabaseMetaData; - SourceConnector(String connectorName) { - this.connectorName = connectorName; +public class Db2Schema extends JdbcSourceSchema { + public Db2Schema( + DatabaseMetaData metaData, + String databaseName, + String schemaName, + String tableName, + String tableComment) + throws Exception { + super(metaData, databaseName, schemaName, tableName, tableComment); } - public String getConnectorName() { - return connectorName; + @Override + public String convertToDorisType(String fieldType, Integer precision, Integer scale) { + return Db2Type.toDorisType(fieldType, precision, scale); + } + + @Override + public String getCdcTableName() { + return schemaName + "\\." + tableName; } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java new file mode 100644 index 00000000..1255d1e7 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Type.java @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.db2; + +import org.apache.flink.util.Preconditions; + +import org.apache.doris.flink.catalog.doris.DorisType; + +public class Db2Type { + private static final String BOOLEAN = "BOOLEAN"; + private static final String SMALLINT = "SMALLINT"; + private static final String INTEGER = "INTEGER"; + private static final String INT = "INT"; + private static final String BIGINT = "BIGINT"; + private static final String REAL = "REAL"; + private static final String DECFLOAT = "DECFLOAT"; + private static final String DOUBLE = "DOUBLE"; + private static final String DECIMAL = "DECIMAL"; + private static final String NUMERIC = "NUMERIC"; + private static final String DATE = "DATE"; + private static final String TIME = "TIME"; + private static final String TIMESTAMP = "TIMESTAMP"; + private static final String CHARACTER = "CHARACTER"; + private static final String CHAR = "CHAR"; + private static final String LONG_VARCHAR = "LONG VARCHAR"; + private static final String VARCHAR = "VARCHAR"; + private static final String XML = "XML"; + private static final String VARGRAPHIC = "VARGRAPHIC"; + + public static String toDorisType(String db2Type, Integer precision, Integer scale) { + db2Type = db2Type.toUpperCase(); + switch (db2Type) { + case BOOLEAN: + return DorisType.BOOLEAN; + case SMALLINT: + return DorisType.SMALLINT; + case INTEGER: + case INT: + return DorisType.INT; + case BIGINT: + return DorisType.BIGINT; + case REAL: + return DorisType.FLOAT; + case DOUBLE: + return DorisType.DOUBLE; + case DATE: + return DorisType.DATE_V2; + case DECFLOAT: + case DECIMAL: + case NUMERIC: + if (precision != null && precision > 0 && precision <= 38) { + if (scale != null && scale >= 0) { + return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, scale); + } + return String.format("%s(%s,%s)", DorisType.DECIMAL_V3, precision, 0); + } else { + return DorisType.STRING; + } + case CHARACTER: + case CHAR: + case VARCHAR: + case LONG_VARCHAR: + Preconditions.checkNotNull(precision); + return precision * 3 > 65533 + ? DorisType.STRING + : String.format("%s(%s)", DorisType.VARCHAR, precision * 3); + case TIMESTAMP: + return String.format( + "%s(%s)", DorisType.DATETIME_V2, Math.min(scale == null ? 0 : scale, 6)); + case TIME: + case VARGRAPHIC: + // Currently, the Flink CDC connector does not support the XML data type from DB2. + // Case XML: + return DorisType.STRING; + default: + throw new UnsupportedOperationException("Unsupported DB2 Type: " + db2Type); + } + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java new file mode 100644 index 00000000..77b8931d --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class CdcDb2SyncDatabaseCase { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.disableOperatorChaining(); + env.enableCheckpointing(10000); + + // Map<String,String> flinkMap = new HashMap<>(); + // flinkMap.put("execution.checkpointing.interval","10s"); + // flinkMap.put("pipeline.operator-chaining","false"); + // flinkMap.put("parallelism.default","1"); + + // Configuration configuration = Configuration.fromMap(flinkMap); + // env.configure(configuration); + + String database = "db2_test"; + String tablePrefix = ""; + String tableSuffix = ""; + Map<String, String> sourceConfig = new HashMap<>(); + sourceConfig.put("database-name", "testdb"); + sourceConfig.put("schema-name", "DB2INST1"); + sourceConfig.put("hostname", "127.0.0.1"); + sourceConfig.put("port", "50000"); + sourceConfig.put("username", "db2inst1"); + sourceConfig.put("password", "=doris123456"); + // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); + sourceConfig.put("scan.incremental.snapshot.enabled", "true"); + // sourceConfig.put("debezium.include.schema.changes","false"); + + Configuration config = Configuration.fromMap(sourceConfig); + + Map<String, String> sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", "127.0.0.1:8030"); + // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); + sinkConfig.put("username", "root"); + sinkConfig.put("password", "123456"); + sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + // tableConfig.put("table-buckets", "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + String includingTables = "FULL_TYPES"; + String excludingTables = null; + String multiToOneOrigin = null; + String multiToOneTarget = null; + boolean ignoreDefaultValue = false; + boolean useNewSchemaChange = true; + boolean singleSink = false; + boolean ignoreIncompatible = false; + DatabaseSync databaseSync = new Db2DatabaseSync(); + databaseSync + .setEnv(env) + .setDatabase(database) + .setConfig(config) + .setTablePrefix(tablePrefix) + .setTableSuffix(tableSuffix) + .setIncludingTables(includingTables) + .setExcludingTables(excludingTables) + .setMultiToOneOrigin(multiToOneOrigin) + .setMultiToOneTarget(multiToOneTarget) + .setIgnoreDefaultValue(ignoreDefaultValue) + .setSinkConfig(sinkConf) + .setTableConfig(tableConfig) + .setCreateTableOnly(false) + .setNewSchemaChange(useNewSchemaChange) + .setSingleSink(singleSink) + .setIgnoreIncompatible(ignoreIncompatible) + .create(); + databaseSync.build(); + env.execute(String.format("DB2-Doris Database Sync: %s", database)); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java new file mode 100644 index 00000000..22656902 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/db2/Db2TypeTest.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.db2; + +import org.apache.doris.flink.catalog.doris.DorisType; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class Db2TypeTest { + @Test + public void db2FullTypeTest() { + assertEquals(DorisType.BOOLEAN, Db2Type.toDorisType("BOOLEAN", 1, null)); + assertEquals(DorisType.SMALLINT, Db2Type.toDorisType("SMALLINT", 5, 0)); + assertEquals(DorisType.INT, Db2Type.toDorisType("INTEGER", 10, 0)); + assertEquals(DorisType.BIGINT, Db2Type.toDorisType("BIGINT", 10, 0)); + assertEquals(DorisType.FLOAT, Db2Type.toDorisType("REAL", 24, null)); + assertEquals(DorisType.DOUBLE, Db2Type.toDorisType("DOUBLE", 53, null)); + assertEquals("DECIMALV3(34,0)", Db2Type.toDorisType("DECFLOAT", 34, null)); + assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("DECIMAL", 31, 0)); + assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("DECIMAL", 31, 31)); + assertEquals("DECIMALV3(31,0)", Db2Type.toDorisType("NUMERIC", 31, 0)); + assertEquals("DECIMALV3(31,31)", Db2Type.toDorisType("NUMERIC", 31, 31)); + assertEquals("VARCHAR(600)", Db2Type.toDorisType("VARCHAR", 200, null)); + assertEquals(DorisType.STRING, Db2Type.toDorisType("VARCHAR", 32672, null)); + assertEquals(DorisType.VARCHAR + "(3)", Db2Type.toDorisType("CHAR", 1, null)); + assertEquals(DorisType.VARCHAR + "(765)", Db2Type.toDorisType("CHAR", 255, null)); + assertEquals(DorisType.DATETIME_V2 + "(0)", Db2Type.toDorisType("TIMESTAMP", 26, 0)); + assertEquals(DorisType.DATETIME_V2 + "(6)", Db2Type.toDorisType("TIMESTAMP", 26, 6)); + assertEquals(DorisType.DATETIME_V2 + "(6)", Db2Type.toDorisType("TIMESTAMP", 26, 9)); + assertEquals(DorisType.DATE_V2, Db2Type.toDorisType("DATE", 10, null)); + assertEquals(DorisType.STRING, Db2Type.toDorisType("TIME", 8, 0)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org