This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 060e13da [feature](cdc) add ignore-incompatible option (#371)
060e13da is described below

commit 060e13dad0b7a7c94fee1c397739f0df29da1e2b
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Wed Jul 3 15:40:26 2024 +0800

    [feature](cdc) add ignore-incompatible option (#371)
---
 .../doris/flink/catalog/doris/DorisSystem.java     |  3 +-
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  2 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 65 ++++++++++++++++++----
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |  2 +
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |  2 +
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  2 +
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |  2 +
 7 files changed, 64 insertions(+), 14 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 31d32e01..ab26e308 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -281,8 +281,7 @@ public class DorisSystem implements Serializable {
     }
 
     private static List<String> identifier(List<String> name) {
-        List<String> result = name.stream().map(m -> 
identifier(m)).collect(Collectors.toList());
-        return result;
+        return 
name.stream().map(DorisSystem::identifier).collect(Collectors.toList());
     }
 
     public static String identifier(String name) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 7443ef8a..38b942ea 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -132,6 +132,7 @@ public class CdcTools {
         String multiToOneTarget = params.get("multi-to-one-target");
         boolean createTableOnly = params.has("create-table-only");
         boolean ignoreDefaultValue = params.has("ignore-default-value");
+        boolean ignoreIncompatible = params.has("ignore-incompatible");
         boolean singleSink = params.has("single-sink");
 
         Preconditions.checkArgument(params.has("sink-conf"));
@@ -155,6 +156,7 @@ public class CdcTools {
                 .setTableConfig(tableMap)
                 .setCreateTableOnly(createTableOnly)
                 .setSingleSink(singleSink)
+                .setIgnoreIncompatible(ignoreIncompatible)
                 .create();
         databaseSync.build();
         if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 691eaafa..a4d0511b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -34,6 +34,7 @@ import org.apache.doris.flink.cfg.DorisConnectionOptions;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisSystemException;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.writer.WriteMode;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.SQLSyntaxErrorException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -71,6 +73,7 @@ public abstract class DatabaseSync {
     protected Map<String, String> tableConfig = new HashMap<>();
     protected Configuration sinkConfig;
     protected boolean ignoreDefaultValue;
+    protected boolean ignoreIncompatible;
 
     public StreamExecutionEnvironment env;
     private boolean createTableOnly = false;
@@ -128,7 +131,9 @@ public abstract class DatabaseSync {
         if (tableConfig.containsKey("table-buckets")) {
             tableBucketsMap = 
getTableBuckets(tableConfig.get("table-buckets"));
         }
-        Set<String> bucketsTable = new HashSet<>();
+
+        // Set of table names that have assigned bucket numbers.
+        Set<String> tablesWithBucketsAssigned = new HashSet<>();
         Set<String> targetDbSet = new HashSet<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
@@ -147,17 +152,13 @@ public abstract class DatabaseSync {
             // Calculate the mapping relationship between upstream and 
downstream tables
             tableMapping.put(
                     schema.getTableIdentifier(), String.format("%s.%s", 
targetDb, dorisTable));
-            if (!dorisSystem.tableExists(targetDb, dorisTable)) {
-                TableSchema dorisSchema = 
schema.convertTableSchema(tableConfig);
-                // set doris target database
-                dorisSchema.setDatabase(targetDb);
-                dorisSchema.setTable(dorisTable);
-                if (tableBucketsMap != null) {
-                    setTableSchemaBuckets(tableBucketsMap, dorisSchema, 
dorisTable, bucketsTable);
-                }
-                dorisSystem.createTable(dorisSchema);
-            }
-            if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
+            if (tryCreateTableIfAbsent(
+                    dorisSystem,
+                    targetDb,
+                    dorisTable,
+                    schema,
+                    tableBucketsMap,
+                    tablesWithBucketsAssigned)) {
                 dorisTables.add(Tuple2.of(targetDb, dorisTable));
             }
         }
@@ -462,6 +463,41 @@ public abstract class DatabaseSync {
         }
     }
 
+    private boolean tryCreateTableIfAbsent(
+            DorisSystem dorisSystem,
+            String targetDb,
+            String dorisTable,
+            SourceSchema schema,
+            Map<String, Integer> tableBucketsMap,
+            Set<String> tableBucketsSet) {
+        if (!dorisSystem.tableExists(targetDb, dorisTable)) {
+            TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
+            dorisSchema.setDatabase(targetDb);
+            dorisSchema.setTable(dorisTable);
+            // set the table buckets of table
+            if (tableBucketsMap != null) {
+                setTableSchemaBuckets(tableBucketsMap, dorisSchema, 
dorisTable, tableBucketsSet);
+            }
+            try {
+                dorisSystem.createTable(dorisSchema);
+                return true;
+            } catch (Exception ex) {
+                handleTableCreationFailure(ex);
+            }
+        }
+        return false;
+    }
+
+    private void handleTableCreationFailure(Exception ex) throws 
DorisSystemException {
+        if (ignoreIncompatible && ex.getCause() instanceof 
SQLSyntaxErrorException) {
+            LOG.warn(
+                    "Doris schema and source table schema are not compatible. 
Error: {} ",
+                    ex.getCause().toString());
+        } else {
+            throw new DorisSystemException("Failed to create table due to: ", 
ex);
+        }
+    }
+
     public DatabaseSync setEnv(StreamExecutionEnvironment env) {
         this.env = env;
         return this;
@@ -529,6 +565,11 @@ public abstract class DatabaseSync {
         return this;
     }
 
+    public DatabaseSync setIgnoreIncompatible(boolean ignoreIncompatible) {
+        this.ignoreIncompatible = ignoreIncompatible;
+        return this;
+    }
+
     public DatabaseSync setTablePrefix(String tablePrefix) {
         this.tablePrefix = tablePrefix;
         return this;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 88176e84..2410ddac 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -73,6 +73,7 @@ public class CdcMysqlSyncDatabaseCase {
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         boolean singleSink = false;
+        boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
         databaseSync
                 .setEnv(env)
@@ -90,6 +91,7 @@ public class CdcMysqlSyncDatabaseCase {
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(useNewSchemaChange)
                 .setSingleSink(singleSink)
+                .setIgnoreIncompatible(ignoreIncompatible)
                 .create();
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 7bfa7477..fba5866c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -77,6 +77,7 @@ public class CdcOraclelSyncDatabaseCase {
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
+        boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
         databaseSync
                 .setEnv(env)
@@ -93,6 +94,7 @@ public class CdcOraclelSyncDatabaseCase {
                 .setTableConfig(tableConfig)
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(useNewSchemaChange)
+                .setIgnoreIncompatible(ignoreIncompatible)
                 .create();
         databaseSync.build();
         env.execute(String.format("Oracle-Doris Database Sync: %s", database));
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index b9afc98b..6c933409 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -79,6 +79,7 @@ public class CdcPostgresSyncDatabaseCase {
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
+        boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new PostgresDatabaseSync();
         databaseSync
                 .setEnv(env)
@@ -95,6 +96,7 @@ public class CdcPostgresSyncDatabaseCase {
                 .setTableConfig(tableConfig)
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(useNewSchemaChange)
+                .setIgnoreIncompatible(ignoreIncompatible)
                 .create();
         databaseSync.build();
         env.execute(String.format("Postgres-Doris Database Sync: %s", 
database));
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 912bcfc7..9fec63b6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -77,6 +77,7 @@ public class CdcSqlServerSyncDatabaseCase {
         String multiToOneTarget = "a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
+        boolean ignoreIncompatible = false;
         DatabaseSync databaseSync = new SqlServerDatabaseSync();
         databaseSync
                 .setEnv(env)
@@ -93,6 +94,7 @@ public class CdcSqlServerSyncDatabaseCase {
                 .setTableConfig(tableConfig)
                 .setCreateTableOnly(false)
                 .setNewSchemaChange(useNewSchemaChange)
+                .setIgnoreIncompatible(ignoreIncompatible)
                 .create();
         databaseSync.build();
         env.execute(String.format("SqlServer-Doris Database Sync: %s", 
database));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to