This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a4b4bdf  [feature] multiple tables to one for DatabaseSync (#208)
a4b4bdf is described below

commit a4b4bdfc92bb8fecefffb6f4f81b0a8f577d142e
Author: Antg <[email protected]>
AuthorDate: Mon Nov 6 16:43:46 2023 +0800

    [feature] multiple tables to one for DatabaseSync (#208)
---
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  4 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 70 ++++++++++++++++++++--
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  | 10 +++-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |  6 +-
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  6 +-
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    | 10 ++--
 .../doris/flink/tools/cdc/DatabaseSyncTest.java    | 40 +++++++++++++
 7 files changed, 128 insertions(+), 18 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 6a390ea..8a8b3db 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -103,6 +103,8 @@ public class CdcTools {
         String tableSuffix = params.get("table-suffix");
         String includingTables = params.get("including-tables");
         String excludingTables = params.get("excluding-tables");
+        String multiToOneOrigin = params.get("multi-to-one-origin");
+        String multiToOneTarget = params.get("multi-to-one-target");
         boolean createTableOnly = params.has("create-table-only");
         boolean ignoreDefaultValue = params.has("ignore-default-value");
         boolean useNewSchemaChange = params.has("use-new-schema-change");
@@ -112,7 +114,7 @@ public class CdcTools {
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, 
createTableOnly, useNewSchemaChange);
+        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, 
ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
         databaseSync.build();
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
             jobName = String.format("%s-Doris Sync Database: %s", type, 
config.getString("database-name","db"));
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index fcd0f4c..99c45eb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,7 @@ public abstract class DatabaseSync {
     protected TableNameConverter converter;
     protected Pattern includingPattern;
     protected Pattern excludingPattern;
+    protected Map<Pattern, String> multiToOneRulesPattern;
     protected Map<String, String> tableConfig;
     protected Configuration sinkConfig;
     protected boolean ignoreDefaultValue;
@@ -67,6 +69,8 @@ public abstract class DatabaseSync {
     private boolean newSchemaChange;
     protected String includingTables;
     protected String excludingTables;
+    protected String multiToOneOrigin;
+    protected String multiToOneTarget;
 
     public abstract void registerDriver() throws SQLException;
 
@@ -82,16 +86,19 @@ public abstract class DatabaseSync {
 
     public void create(StreamExecutionEnvironment env, String database, 
Configuration config,
                        String tablePrefix, String tableSuffix, String 
includingTables,
-                       String excludingTables, boolean ignoreDefaultValue, 
Configuration sinkConfig,
+                       String excludingTables,String multiToOneOrigin,String 
multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig,
             Map<String, String> tableConfig, boolean createTableOnly, boolean 
useNewSchemaChange) {
         this.env = env;
         this.config = config;
         this.database = database;
-        this.converter = new TableNameConverter(tablePrefix, tableSuffix);
         this.includingTables = includingTables;
         this.excludingTables = excludingTables;
+        this.multiToOneOrigin = multiToOneOrigin;
+        this.multiToOneTarget = multiToOneTarget;
         this.includingPattern = includingTables == null ? null : 
Pattern.compile(includingTables);
         this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        this.multiToOneRulesPattern = 
multiToOneRulesParser(multiToOneOrigin,multiToOneTarget);
+        this.converter = new TableNameConverter(tablePrefix, 
tableSuffix,multiToOneRulesPattern);
         this.ignoreDefaultValue = ignoreDefaultValue;
         this.sinkConfig = sinkConfig;
         this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
@@ -118,7 +125,7 @@ public abstract class DatabaseSync {
         List<String> dorisTables = new ArrayList<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
-            String dorisTable = converter.convert(schema.getTableName());
+            String dorisTable=converter.convert(schema.getTableName());
             if (!dorisSystem.tableExists(database, dorisTable)) {
                 TableSchema dorisSchema = 
schema.convertTableSchema(tableConfig);
                 //set doris target database
@@ -126,7 +133,9 @@ public abstract class DatabaseSync {
                 dorisSchema.setTable(dorisTable);
                 dorisSystem.createTable(dorisSchema);
             }
-            dorisTables.add(dorisTable);
+            if(!dorisTables.contains(dorisTable)){
+                dorisTables.add(dorisTable);
+            }
         }
         if(createTableOnly){
             System.out.println("Create table finished.");
@@ -139,7 +148,6 @@ public abstract class DatabaseSync {
         for (String table : dorisTables) {
             OutputTag<String> recordOutputTag = 
ParsingProcessFunction.createRecordOutputTag(table);
             DataStream<String> sideOutput = 
parsedStream.getSideOutput(recordOutputTag);
-
             int sinkParallel = 
sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, 
sideOutput.getParallelism());
             
sideOutput.sinkTo(buildDorisSink(table)).setParallelism(sinkParallel).name(table).uid(table);
         }
@@ -245,11 +253,36 @@ public abstract class DatabaseSync {
         LOG.debug("table {} is synchronized? {}", tableName, sync);
         return sync;
     }
+    /**
+     * Filter table that many tables merge to one
+     */
+    protected HashMap<Pattern,String> multiToOneRulesParser(String 
multiToOneOrigin,String multiToOneTarget){
+        if(StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin) || 
StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)){
+            return null;
+        }
+        HashMap<Pattern,String> multiToOneRulesPattern= new HashMap<>();
+        String[] origins = multiToOneOrigin.split("\\|");
+        String[] targets = multiToOneTarget.split("\\|");
+        if(origins.length!=targets.length){
+            System.out.println("param error : multi to one params length are 
not equal,please check your params.");
+            System.exit(1);
+        }
+        try {
+            for (int i = 0; i < origins.length; i++) {
+                
multiToOneRulesPattern.put(Pattern.compile(origins[i]),targets[i]);
+            }
+        } catch (Exception e) {
+            System.out.println("param error : Your regular expression is 
incorrect,please check.");
+            System.exit(1);
+        }
+        return multiToOneRulesPattern;
+    }
 
     public static class TableNameConverter implements Serializable {
         private static final long serialVersionUID = 1L;
         private final String prefix;
         private final String suffix;
+        private Map<Pattern,String> multiToOneRulesPattern;
 
         TableNameConverter(){
             this("","");
@@ -260,8 +293,33 @@ public abstract class DatabaseSync {
             this.suffix = suffix == null ? "" : suffix;
         }
 
+        TableNameConverter(String prefix, String suffix,Map<Pattern, String> 
multiToOneRulesPattern) {
+            this.prefix = prefix == null ? "" : prefix;
+            this.suffix = suffix == null ? "" : suffix;
+            this.multiToOneRulesPattern = multiToOneRulesPattern;
+        }
+
         public String convert(String tableName) {
-            return prefix + tableName + suffix;
+            if(multiToOneRulesPattern==null){
+                return prefix + tableName + suffix;
+            }
+
+            String target=null;
+
+            for (Map.Entry<Pattern, String> patternStringEntry : 
multiToOneRulesPattern.entrySet()) {
+                if(patternStringEntry.getKey().matcher(tableName).matches()){
+                    target=patternStringEntry.getValue();
+                }
+            }
+            /**
+             * If multiToOneRulesPattern is not null and target is not 
assigned,
+             * then the synchronization task contains both multi to one and 
one to one ,
+             * prefixes and suffixes are added to common one-to-one mapping 
tables
+             * */
+            if(target==null){
+                return prefix + tableName + suffix;
+            }
+            return target;
         }
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 1a205b1..875fb4c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -48,7 +48,8 @@ public class CdcMysqlSyncDatabaseCase {
         mysqlConfig.put("hostname","127.0.0.1");
         mysqlConfig.put("port","3306");
         mysqlConfig.put("username","root");
-        mysqlConfig.put("password","");
+//        mysqlConfig.put("password","");
+        mysqlConfig.put("password","12345678");
         Configuration config = Configuration.fromMap(mysqlConfig);
 
         Map<String,String> sinkConfig = new HashMap<>();
@@ -63,12 +64,15 @@ public class CdcMysqlSyncDatabaseCase {
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "tbl1|tbl2|tbl3";
+//        String includingTables = "tbl1|tbl2|tbl3";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 3a2a39e..9b6277f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -69,12 +69,14 @@ public class CdcOraclelSyncDatabaseCase {
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "test.*";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("Oracle-Doris Database Sync: %s", database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index cf5e1d8..87fa871 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -72,12 +72,14 @@ public class CdcPostgresSyncDatabaseCase {
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "testcdc";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new PostgresDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("Postgres-Doris Database Sync: %s", 
database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 7251a7f..d247500 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -51,7 +51,7 @@ public class CdcSqlServerSyncDatabaseCase {
         sourceConfig.put("hostname","127.0.0.1");
         sourceConfig.put("port","1433");
         sourceConfig.put("username","sa");
-        sourceConfig.put("password","123456");
+        sourceConfig.put("password","Passw@rd");
 //        
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
 //        sourceConfig.put("scan.incremental.snapshot.enabled","true");
 //        sourceConfig.put("debezium.include.schema.changes","false");
@@ -70,14 +70,16 @@ public class CdcSqlServerSyncDatabaseCase {
         Map<String,String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
 
-        String includingTables = "products_test";
+        String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
+        String multiToOneOrigin="a_.*|b_.*";
+        String multiToOneTarget="a|b";
         boolean ignoreDefaultValue = false;
         boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new SqlServerDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,multiToOneOrigin,multiToOneTarget,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
         databaseSync.build();
-        env.execute(String.format("Postgres-Doris Database Sync: %s", 
database));
+        env.execute(String.format("SqlServer-Doris Database Sync: %s", 
database));
 
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
new file mode 100644
index 0000000..daab90b
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.tools.cdc;
+
+import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.junit.Test;
+import java.util.Arrays;
+
+/**
+ * Unit tests for the {@link DatabaseSync}.
+ **/
+public class DatabaseSyncTest {
+    @Test
+    public void multiToOneRulesParserTest() throws Exception{
+        String[][] testCase = {
+                {"a_.*|b_.*","a|b"} //  Normal condition
+//                ,{"a_.*|b_.*","a|b|c"} // Unequal length
+//                ,{"",""} // Null value
+//                ,{"***....","a"} // Abnormal regular expression
+        };
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        Arrays.stream(testCase).forEach(arr->{
+            databaseSync.multiToOneRulesParser(arr[0], arr[1]);
+        });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to