This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 060e13da [feature](cdc) add ignore-incompatible option (#371) 060e13da is described below commit 060e13dad0b7a7c94fee1c397739f0df29da1e2b Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Jul 3 15:40:26 2024 +0800 [feature](cdc) add ignore-incompatible option (#371) --- .../doris/flink/catalog/doris/DorisSystem.java | 3 +- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 2 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 65 ++++++++++++++++++---- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 + .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 2 + .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 + .../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 2 + 7 files changed, 64 insertions(+), 14 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 31d32e01..ab26e308 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -281,8 +281,7 @@ public class DorisSystem implements Serializable { } private static List<String> identifier(List<String> name) { - List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); - return result; + return name.stream().map(DorisSystem::identifier).collect(Collectors.toList()); } public static String identifier(String name) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 7443ef8a..38b942ea 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -132,6 +132,7 @@ public class CdcTools { String multiToOneTarget = params.get("multi-to-one-target"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); + boolean ignoreIncompatible = params.has("ignore-incompatible"); boolean singleSink = params.has("single-sink"); Preconditions.checkArgument(params.has("sink-conf")); @@ -155,6 +156,7 @@ public class CdcTools { .setTableConfig(tableMap) .setCreateTableOnly(createTableOnly) .setSingleSink(singleSink) + .setIgnoreIncompatible(ignoreIncompatible) .create(); databaseSync.build(); if (StringUtils.isNullOrWhitespaceOnly(jobName)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 691eaafa..a4d0511b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -34,6 +34,7 @@ import org.apache.doris.flink.cfg.DorisConnectionOptions; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.sql.Connection; import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -71,6 +73,7 @@ public abstract class DatabaseSync { protected Map<String, String> tableConfig = new HashMap<>(); protected Configuration sinkConfig; protected boolean ignoreDefaultValue; + protected boolean ignoreIncompatible; public StreamExecutionEnvironment env; private boolean createTableOnly = false; @@ -128,7 +131,9 @@ public abstract class DatabaseSync { if (tableConfig.containsKey("table-buckets")) { tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets")); } - Set<String> bucketsTable = new HashSet<>(); + + // Set of table names that have assigned bucket numbers. + Set<String> tablesWithBucketsAssigned = new HashSet<>(); Set<String> targetDbSet = new HashSet<>(); for (SourceSchema schema : schemaList) { syncTables.add(schema.getTableName()); @@ -147,17 +152,13 @@ public abstract class DatabaseSync { // Calculate the mapping relationship between upstream and downstream tables tableMapping.put( schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable)); - if (!dorisSystem.tableExists(targetDb, dorisTable)) { - TableSchema dorisSchema = schema.convertTableSchema(tableConfig); - // set doris target database - dorisSchema.setDatabase(targetDb); - dorisSchema.setTable(dorisTable); - if (tableBucketsMap != null) { - setTableSchemaBuckets(tableBucketsMap, dorisSchema, dorisTable, bucketsTable); - } - dorisSystem.createTable(dorisSchema); - } - if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) { + if (tryCreateTableIfAbsent( + dorisSystem, + targetDb, + dorisTable, + schema, + tableBucketsMap, + tablesWithBucketsAssigned)) { dorisTables.add(Tuple2.of(targetDb, dorisTable)); } } @@ -462,6 +463,41 @@ public abstract class DatabaseSync { } } + private boolean tryCreateTableIfAbsent( + DorisSystem dorisSystem, + String targetDb, + String dorisTable, + SourceSchema schema, + Map<String, Integer> tableBucketsMap, + Set<String> tableBucketsSet) { + if (!dorisSystem.tableExists(targetDb, dorisTable)) { + TableSchema dorisSchema = schema.convertTableSchema(tableConfig); + dorisSchema.setDatabase(targetDb); + dorisSchema.setTable(dorisTable); + // set the table buckets of table + if (tableBucketsMap != null) { + setTableSchemaBuckets(tableBucketsMap, dorisSchema, dorisTable, tableBucketsSet); + } + try { + dorisSystem.createTable(dorisSchema); + return true; + } catch (Exception ex) { + handleTableCreationFailure(ex); + } + } + return false; + } + + private void handleTableCreationFailure(Exception ex) throws DorisSystemException { + if (ignoreIncompatible && ex.getCause() instanceof SQLSyntaxErrorException) { + LOG.warn( + "Doris schema and source table schema are not compatible. Error: {} ", + ex.getCause().toString()); + } else { + throw new DorisSystemException("Failed to create table due to: ", ex); + } + } + public DatabaseSync setEnv(StreamExecutionEnvironment env) { this.env = env; return this; @@ -529,6 +565,11 @@ public abstract class DatabaseSync { return this; } + public DatabaseSync setIgnoreIncompatible(boolean ignoreIncompatible) { + this.ignoreIncompatible = ignoreIncompatible; + return this; + } + public DatabaseSync setTablePrefix(String tablePrefix) { this.tablePrefix = tablePrefix; return this; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index 88176e84..2410ddac 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -73,6 +73,7 @@ public class CdcMysqlSyncDatabaseCase { boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; boolean singleSink = false; + boolean ignoreIncompatible = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); databaseSync .setEnv(env) @@ -90,6 +91,7 @@ public class CdcMysqlSyncDatabaseCase { .setCreateTableOnly(false) .setNewSchemaChange(useNewSchemaChange) .setSingleSink(singleSink) + .setIgnoreIncompatible(ignoreIncompatible) .create(); databaseSync.build(); env.execute(String.format("MySQL-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index 7bfa7477..fba5866c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -77,6 +77,7 @@ public class CdcOraclelSyncDatabaseCase { String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean ignoreIncompatible = false; DatabaseSync databaseSync = new OracleDatabaseSync(); databaseSync .setEnv(env) @@ -93,6 +94,7 @@ public class CdcOraclelSyncDatabaseCase { .setTableConfig(tableConfig) .setCreateTableOnly(false) .setNewSchemaChange(useNewSchemaChange) + .setIgnoreIncompatible(ignoreIncompatible) .create(); databaseSync.build(); env.execute(String.format("Oracle-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index b9afc98b..6c933409 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -79,6 +79,7 @@ public class CdcPostgresSyncDatabaseCase { String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean ignoreIncompatible = false; DatabaseSync databaseSync = new PostgresDatabaseSync(); databaseSync .setEnv(env) @@ -95,6 +96,7 @@ public class CdcPostgresSyncDatabaseCase { .setTableConfig(tableConfig) .setCreateTableOnly(false) .setNewSchemaChange(useNewSchemaChange) + .setIgnoreIncompatible(ignoreIncompatible) .create(); databaseSync.build(); env.execute(String.format("Postgres-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 912bcfc7..9fec63b6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -77,6 +77,7 @@ public class CdcSqlServerSyncDatabaseCase { String multiToOneTarget = "a|b"; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean ignoreIncompatible = false; DatabaseSync databaseSync = new SqlServerDatabaseSync(); databaseSync .setEnv(env) @@ -93,6 +94,7 @@ public class CdcSqlServerSyncDatabaseCase { .setTableConfig(tableConfig) .setCreateTableOnly(false) .setNewSchemaChange(useNewSchemaChange) + .setIgnoreIncompatible(ignoreIncompatible) .create(); databaseSync.build(); env.execute(String.format("SqlServer-Doris Database Sync: %s", database)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org