This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new bf855b1  [feature][cdc] specify bucket when synchronizing database 
(#288)
bf855b1 is described below

commit bf855b16666247b13b5cc83358f3242a345f7de1
Author: Petrichor <[email protected]>
AuthorDate: Mon Jan 15 10:40:20 2024 +0800

    [feature][cdc] specify bucket when synchronizing database (#288)
---
 .../doris/flink/catalog/doris/DorisSystem.java     | 24 +++++-
 .../doris/flink/catalog/doris/TableSchema.java     | 10 +++
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |  3 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 67 +++++++++++++++
 .../apache/doris/flink/tools/cdc/SourceSchema.java |  2 +-
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |  2 +-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |  2 +-
 .../tools/cdc/CdcPostgresSyncDatabaseCase.java     |  2 +-
 .../tools/cdc/CdcSqlServerSyncDatabaseCase.java    |  2 +-
 .../doris/flink/tools/cdc/DatabaseSyncTest.java    | 96 +++++++++++++++++++++-
 10 files changed, 199 insertions(+), 11 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 11e19b9..269affa 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -48,6 +48,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public class DorisSystem implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSystem.class);
+    private static final String TABLE_BUCKETS = "table-buckets";
     private final JdbcConnectionProvider jdbcConnectionProvider;
     private static final List<String> builtinDatabases =
             Collections.singletonList("information_schema");
@@ -199,11 +200,28 @@ public class DorisSystem implements Serializable {
         // append distribute key
         sb.append(" DISTRIBUTED BY HASH(")
                 .append(String.join(",", 
identifier(schema.getDistributeKeys())))
-                .append(") BUCKETS AUTO ");
+                .append(")");
 
+        Map<String, String> properties = schema.getProperties();
+        if (schema.getTableBuckets() != null) {
+
+            int bucketsNum = schema.getTableBuckets();
+            if (bucketsNum <= 0) {
+                throw new CreateTableException("The number of buckets must be 
positive.");
+            }
+            sb.append(" BUCKETS ").append(bucketsNum);
+        } else {
+            sb.append(" BUCKETS AUTO ");
+        }
         // append properties
         int index = 0;
-        for (Map.Entry<String, String> entry : 
schema.getProperties().entrySet()) {
+        int skipProNum = 0;
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            // skip table-buckets
+            if (entry.getKey().equals(TABLE_BUCKETS)) {
+                skipProNum++;
+                continue;
+            }
             if (index == 0) {
                 sb.append(" PROPERTIES (");
             }
@@ -215,7 +233,7 @@ public class DorisSystem implements Serializable {
                     .append(quoteProperties(entry.getValue()));
             index++;
 
-            if (index == schema.getProperties().size()) {
+            if (index == (schema.getProperties().size() - skipProNum)) {
                 sb.append(")");
             }
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index a9b2af0..f3da962 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -32,6 +32,8 @@ public class TableSchema {
     private List<String> distributeKeys = new ArrayList<>();
     private Map<String, String> properties = new HashMap<>();
 
+    private Integer tableBuckets;
+
     public String getDatabase() {
         return database;
     }
@@ -95,4 +97,12 @@ public class TableSchema {
     public void setProperties(Map<String, String> properties) {
         this.properties = properties;
     }
+
+    public void setTableBuckets(Integer tableBuckets) {
+        this.tableBuckets = tableBuckets;
+    }
+
+    public Integer getTableBuckets() {
+        return tableBuckets;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 2841c25..9caddd2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
 import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,7 +39,7 @@ public class CdcTools {
     private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
     private static final String POSTGRES_SYNC_DATABASE = 
"postgres-sync-database";
     private static final String SQLSERVER_SYNC_DATABASE = 
"sqlserver-sync-database";
-    private static final List<String> EMPTY_KEYS = Arrays.asList("password");
+    private static final List<String> EMPTY_KEYS = 
Collections.singletonList("password");
 
     public static void main(String[] args) throws Exception {
         String operation = args[0].toLowerCase();
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 77bd631..f1e172b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -43,9 +43,12 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -117,6 +120,12 @@ public abstract class DatabaseSync {
 
         List<String> syncTables = new ArrayList<>();
         List<String> dorisTables = new ArrayList<>();
+
+        Map<String, Integer> tableBucketsMap = null;
+        if (tableConfig.containsKey("table-buckets")) {
+            tableBucketsMap = 
getTableBuckets(tableConfig.get("table-buckets"));
+        }
+        Set<String> bucketsTable = new HashSet<>();
         for (SourceSchema schema : schemaList) {
             syncTables.add(schema.getTableName());
             String dorisTable = converter.convert(schema.getTableName());
@@ -129,6 +138,9 @@ public abstract class DatabaseSync {
                 // set doris target database
                 dorisSchema.setDatabase(database);
                 dorisSchema.setTable(dorisTable);
+                if (tableBucketsMap != null) {
+                    setTableSchemaBuckets(tableBucketsMap, dorisSchema, 
dorisTable, bucketsTable);
+                }
                 dorisSystem.createTable(dorisSchema);
             }
             if (!dorisTables.contains(dorisTable)) {
@@ -338,6 +350,61 @@ public abstract class DatabaseSync {
         return multiToOneRulesPattern;
     }
 
+    /**
+     * Get table buckets Map.
+     *
+     * @param tableBuckets the string of tableBuckets, 
eg:student:10,student_info:20,student.*:30
+     * @return The table name and buckets map. The key is table name, the 
value is buckets.
+     */
+    public Map<String, Integer> getTableBuckets(String tableBuckets) {
+        Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
+        String[] tableBucketsArray = tableBuckets.split(",");
+        for (String tableBucket : tableBucketsArray) {
+            String[] tableBucketArray = tableBucket.split(":");
+            tableBucketsMap.put(
+                    tableBucketArray[0].trim(), 
Integer.parseInt(tableBucketArray[1].trim()));
+        }
+        return tableBucketsMap;
+    }
+
+    /**
+     * Set table schema buckets.
+     *
+     * @param tableBucketsMap The table name and buckets map. The key is table 
name, the value is
+     *     buckets.
+     * @param dorisSchema @{TableSchema}
+     * @param dorisTable the table name need to set buckets
+     * @param tableHasSet The buckets table is set
+     */
+    public void setTableSchemaBuckets(
+            Map<String, Integer> tableBucketsMap,
+            TableSchema dorisSchema,
+            String dorisTable,
+            Set<String> tableHasSet) {
+
+        if (tableBucketsMap != null) {
+            // Firstly, if the table name is in the table-buckets map, set the 
buckets of the table.
+            if (tableBucketsMap.containsKey(dorisTable)) {
+                dorisSchema.setTableBuckets(tableBucketsMap.get(dorisTable));
+                tableHasSet.add(dorisTable);
+                return;
+            }
+            // Secondly, iterate over the map to find a corresponding regular 
expression match,
+            for (Map.Entry<String, Integer> entry : 
tableBucketsMap.entrySet()) {
+                if (tableHasSet.contains(entry.getKey())) {
+                    continue;
+                }
+
+                Pattern pattern = Pattern.compile(entry.getKey());
+                if (pattern.matcher(dorisTable).matches()) {
+                    dorisSchema.setTableBuckets(entry.getValue());
+                    tableHasSet.add(dorisTable);
+                    return;
+                }
+            }
+        }
+    }
+
     public DatabaseSync setEnv(StreamExecutionEnvironment env) {
         this.env = env;
         return this;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 426d9ed..1ad09d7 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -138,7 +138,7 @@ public abstract class SourceSchema {
         return tableName;
     }
 
-    public LinkedHashMap<String, FieldSchema> getFields() {
+    public Map<String, FieldSchema> getFields() {
         return fields;
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 4e28fc4..3cc9db8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -64,7 +64,7 @@ public class CdcMysqlSyncDatabaseCase {
 
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
-
+        tableConfig.put("table-buckets", 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         // String includingTables = "tbl1|tbl2|tbl3";
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 24606d6..312345c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -69,7 +69,7 @@ public class CdcOraclelSyncDatabaseCase {
 
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
-
+        tableConfig.put("table-buckets", 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 9b17c34..b9afc98 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -72,7 +72,7 @@ public class CdcPostgresSyncDatabaseCase {
 
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
-
+        tableConfig.put("table-buckets", 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 89dcddd..912bcfc 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -70,7 +70,7 @@ public class CdcSqlServerSyncDatabaseCase {
 
         Map<String, String> tableConfig = new HashMap<>();
         tableConfig.put("replication_num", "1");
-
+        tableConfig.put("table-buckets", 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "a_.*|b_.*|c";
         String excludingTables = "";
         String multiToOneOrigin = "a_.*|b_.*";
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index 293d15e..23dbff0 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -19,11 +19,20 @@ package org.apache.doris.flink.tools.cdc;
 
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.Assert;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
+import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
 
 /** Unit tests for the {@link DatabaseSync}. */
 public class DatabaseSyncTest {
@@ -53,6 +62,89 @@ public class DatabaseSyncTest {
         config.setString("table-name", "tbl.*");
         databaseSync.setConfig(config);
         String syncTableList = 
databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2"));
-        Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
+        assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
+    }
+
+    @Test
+    public void getTableBucketsTest() throws SQLException {
+        String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        Map<String, Integer> tableBucketsMap = 
databaseSync.getTableBuckets(tableBuckets);
+        assertEquals(10, tableBucketsMap.get("tbl1").intValue());
+        assertEquals(20, tableBucketsMap.get("tbl2").intValue());
+        assertEquals(30, tableBucketsMap.get("a.*").intValue());
+        assertEquals(40, tableBucketsMap.get("b.*").intValue());
+        assertEquals(50, tableBucketsMap.get(".*").intValue());
+    }
+
+    @Test
+    public void setTableSchemaBucketsTest() throws SQLException {
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        String tableSchemaBuckets = 
"tbl1:10,tbl2:20,a11.*:30,a1.*:40,b.*:50,b1.*:60,.*:70";
+        Map<String, Integer> tableBucketsMap = 
databaseSync.getTableBuckets(tableSchemaBuckets);
+        List<String> tableList =
+                Arrays.asList(
+                        "tbl1", "tbl2", "tbl3", "a11", "a111", "a12", "a13", 
"b1", "b11", "b2",
+                        "c1", "d1");
+        HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets();
+        Set<String> tableSet = new HashSet<>();
+        tableList.forEach(
+                tableName -> {
+                    TableSchema tableSchema = new TableSchema();
+                    tableSchema.setTable(tableName);
+                    databaseSync.setTableSchemaBuckets(
+                            tableBucketsMap, tableSchema, tableName, tableSet);
+                    assertEquals(
+                            matchedTableBucketsMap.get(tableName), 
tableSchema.getTableBuckets());
+                });
+    }
+
+    @Test
+    public void setTableSchemaBucketsTest1() throws SQLException {
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        String tableSchemaBuckets = ".*:10,a.*:20,tbl:30,b.*:40";
+        Map<String, Integer> tableBucketsMap = 
databaseSync.getTableBuckets(tableSchemaBuckets);
+        List<String> tableList = Arrays.asList("a1", "a2", "a3", "b1", "a");
+        HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets1();
+        Set<String> tableSet = new HashSet<>();
+        tableList.forEach(
+                tableName -> {
+                    TableSchema tableSchema = new TableSchema();
+                    tableSchema.setTable(tableName);
+                    databaseSync.setTableSchemaBuckets(
+                            tableBucketsMap, tableSchema, tableName, tableSet);
+                    assertEquals(
+                            matchedTableBucketsMap.get(tableName), 
tableSchema.getTableBuckets());
+                });
+    }
+
+    @NotNull
+    private static HashMap<String, Integer> mockTableBuckets() {
+        HashMap<String, Integer> matchedTableBucketsMap = new HashMap<>();
+        matchedTableBucketsMap.put("tbl1", 10);
+        matchedTableBucketsMap.put("tbl2", 20);
+        matchedTableBucketsMap.put("a11", 30);
+        matchedTableBucketsMap.put("a111", 30);
+        matchedTableBucketsMap.put("a12", 40);
+        matchedTableBucketsMap.put("a13", 40);
+        matchedTableBucketsMap.put("b1", 50);
+        matchedTableBucketsMap.put("b11", 50);
+        matchedTableBucketsMap.put("b2", 50);
+        matchedTableBucketsMap.put("c1", 70);
+        matchedTableBucketsMap.put("d1", 70);
+        matchedTableBucketsMap.put("tbl3", 70);
+        return matchedTableBucketsMap;
+    }
+
+    @NotNull
+    private static HashMap<String, Integer> mockTableBuckets1() {
+        HashMap<String, Integer> matchedTableBucketsMap = new HashMap<>();
+        matchedTableBucketsMap.put("a", 10);
+        matchedTableBucketsMap.put("a1", 10);
+        matchedTableBucketsMap.put("a2", 10);
+        matchedTableBucketsMap.put("a3", 10);
+        matchedTableBucketsMap.put("b1", 10);
+        matchedTableBucketsMap.put("tbl1", 10);
+        return matchedTableBucketsMap;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to