This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new bf855b1 [feature][cdc] specify bucket when synchronizing database
(#288)
bf855b1 is described below
commit bf855b16666247b13b5cc83358f3242a345f7de1
Author: Petrichor <[email protected]>
AuthorDate: Mon Jan 15 10:40:20 2024 +0800
[feature][cdc] specify bucket when synchronizing database (#288)
---
.../doris/flink/catalog/doris/DorisSystem.java | 24 +++++-
.../doris/flink/catalog/doris/TableSchema.java | 10 +++
.../org/apache/doris/flink/tools/cdc/CdcTools.java | 3 +-
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 67 +++++++++++++++
.../apache/doris/flink/tools/cdc/SourceSchema.java | 2 +-
.../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 2 +-
.../tools/cdc/CdcOraclelSyncDatabaseCase.java | 2 +-
.../tools/cdc/CdcPostgresSyncDatabaseCase.java | 2 +-
.../tools/cdc/CdcSqlServerSyncDatabaseCase.java | 2 +-
.../doris/flink/tools/cdc/DatabaseSyncTest.java | 96 +++++++++++++++++++++-
10 files changed, 199 insertions(+), 11 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 11e19b9..269affa 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -48,6 +48,7 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
public class DorisSystem implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(DorisSystem.class);
+ private static final String TABLE_BUCKETS = "table-buckets";
private final JdbcConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases =
Collections.singletonList("information_schema");
@@ -199,11 +200,28 @@ public class DorisSystem implements Serializable {
// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",",
identifier(schema.getDistributeKeys())))
- .append(") BUCKETS AUTO ");
+ .append(")");
+ Map<String, String> properties = schema.getProperties();
+ if (schema.getTableBuckets() != null) {
+
+ int bucketsNum = schema.getTableBuckets();
+ if (bucketsNum <= 0) {
+ throw new CreateTableException("The number of buckets must be
positive.");
+ }
+ sb.append(" BUCKETS ").append(bucketsNum);
+ } else {
+ sb.append(" BUCKETS AUTO ");
+ }
// append properties
int index = 0;
- for (Map.Entry<String, String> entry :
schema.getProperties().entrySet()) {
+ int skipProNum = 0;
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ // skip table-buckets
+ if (entry.getKey().equals(TABLE_BUCKETS)) {
+ skipProNum++;
+ continue;
+ }
if (index == 0) {
sb.append(" PROPERTIES (");
}
@@ -215,7 +233,7 @@ public class DorisSystem implements Serializable {
.append(quoteProperties(entry.getValue()));
index++;
- if (index == schema.getProperties().size()) {
+ if (index == (schema.getProperties().size() - skipProNum)) {
sb.append(")");
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
index a9b2af0..f3da962 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java
@@ -32,6 +32,8 @@ public class TableSchema {
private List<String> distributeKeys = new ArrayList<>();
private Map<String, String> properties = new HashMap<>();
+ private Integer tableBuckets;
+
public String getDatabase() {
return database;
}
@@ -95,4 +97,12 @@ public class TableSchema {
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
+
+ public void setTableBuckets(Integer tableBuckets) {
+ this.tableBuckets = tableBuckets;
+ }
+
+ public Integer getTableBuckets() {
+ return tableBuckets;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 2841c25..9caddd2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -28,6 +28,7 @@ import
org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync;
import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,7 +39,7 @@ public class CdcTools {
private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
private static final String POSTGRES_SYNC_DATABASE =
"postgres-sync-database";
private static final String SQLSERVER_SYNC_DATABASE =
"sqlserver-sync-database";
- private static final List<String> EMPTY_KEYS = Arrays.asList("password");
+ private static final List<String> EMPTY_KEYS =
Collections.singletonList("password");
public static void main(String[] args) throws Exception {
String operation = args[0].toLowerCase();
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 77bd631..f1e172b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -43,9 +43,12 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -117,6 +120,12 @@ public abstract class DatabaseSync {
List<String> syncTables = new ArrayList<>();
List<String> dorisTables = new ArrayList<>();
+
+ Map<String, Integer> tableBucketsMap = null;
+ if (tableConfig.containsKey("table-buckets")) {
+ tableBucketsMap =
getTableBuckets(tableConfig.get("table-buckets"));
+ }
+ Set<String> bucketsTable = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String dorisTable = converter.convert(schema.getTableName());
@@ -129,6 +138,9 @@ public abstract class DatabaseSync {
// set doris target database
dorisSchema.setDatabase(database);
dorisSchema.setTable(dorisTable);
+ if (tableBucketsMap != null) {
+ setTableSchemaBuckets(tableBucketsMap, dorisSchema,
dorisTable, bucketsTable);
+ }
dorisSystem.createTable(dorisSchema);
}
if (!dorisTables.contains(dorisTable)) {
@@ -338,6 +350,61 @@ public abstract class DatabaseSync {
return multiToOneRulesPattern;
}
+ /**
+ * Get table buckets Map.
+ *
+ * @param tableBuckets the string of tableBuckets,
eg:student:10,student_info:20,student.*:30
+ * @return The table name and buckets map. The key is table name, the
value is buckets.
+ */
+ public Map<String, Integer> getTableBuckets(String tableBuckets) {
+ Map<String, Integer> tableBucketsMap = new LinkedHashMap<>();
+ String[] tableBucketsArray = tableBuckets.split(",");
+ for (String tableBucket : tableBucketsArray) {
+ String[] tableBucketArray = tableBucket.split(":");
+ tableBucketsMap.put(
+ tableBucketArray[0].trim(),
Integer.parseInt(tableBucketArray[1].trim()));
+ }
+ return tableBucketsMap;
+ }
+
+ /**
+ * Set table schema buckets.
+ *
+ * @param tableBucketsMap The table name and buckets map. The key is table
name, the value is
+ * buckets.
+ * @param dorisSchema @{TableSchema}
+ * @param dorisTable the table name need to set buckets
+ * @param tableHasSet The buckets table is set
+ */
+ public void setTableSchemaBuckets(
+ Map<String, Integer> tableBucketsMap,
+ TableSchema dorisSchema,
+ String dorisTable,
+ Set<String> tableHasSet) {
+
+ if (tableBucketsMap != null) {
+ // Firstly, if the table name is in the table-buckets map, set the
buckets of the table.
+ if (tableBucketsMap.containsKey(dorisTable)) {
+ dorisSchema.setTableBuckets(tableBucketsMap.get(dorisTable));
+ tableHasSet.add(dorisTable);
+ return;
+ }
+ // Secondly, iterate over the map to find a corresponding regular
expression match,
+ for (Map.Entry<String, Integer> entry :
tableBucketsMap.entrySet()) {
+ if (tableHasSet.contains(entry.getKey())) {
+ continue;
+ }
+
+ Pattern pattern = Pattern.compile(entry.getKey());
+ if (pattern.matcher(dorisTable).matches()) {
+ dorisSchema.setTableBuckets(entry.getValue());
+ tableHasSet.add(dorisTable);
+ return;
+ }
+ }
+ }
+ }
+
public DatabaseSync setEnv(StreamExecutionEnvironment env) {
this.env = env;
return this;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index 426d9ed..1ad09d7 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -138,7 +138,7 @@ public abstract class SourceSchema {
return tableName;
}
- public LinkedHashMap<String, FieldSchema> getFields() {
+ public Map<String, FieldSchema> getFields() {
return fields;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 4e28fc4..3cc9db8 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -64,7 +64,7 @@ public class CdcMysqlSyncDatabaseCase {
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
-
+ tableConfig.put("table-buckets",
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
// String includingTables = "tbl1|tbl2|tbl3";
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 24606d6..312345c 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -69,7 +69,7 @@ public class CdcOraclelSyncDatabaseCase {
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
-
+ tableConfig.put("table-buckets",
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
index 9b17c34..b9afc98 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java
@@ -72,7 +72,7 @@ public class CdcPostgresSyncDatabaseCase {
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
-
+ tableConfig.put("table-buckets",
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
index 89dcddd..912bcfc 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java
@@ -70,7 +70,7 @@ public class CdcSqlServerSyncDatabaseCase {
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
-
+ tableConfig.put("table-buckets",
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "a_.*|b_.*|c";
String excludingTables = "";
String multiToOneOrigin = "a_.*|b_.*";
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index 293d15e..23dbff0 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -19,11 +19,20 @@ package org.apache.doris.flink.tools.cdc;
import org.apache.flink.configuration.Configuration;
+import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.Assert;
+import org.jetbrains.annotations.NotNull;
import org.junit.Test;
+import java.sql.SQLException;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
/** Unit tests for the {@link DatabaseSync}. */
public class DatabaseSyncTest {
@@ -53,6 +62,89 @@ public class DatabaseSyncTest {
config.setString("table-name", "tbl.*");
databaseSync.setConfig(config);
String syncTableList =
databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2"));
- Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
+ assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
+ }
+
+ @Test
+ public void getTableBucketsTest() throws SQLException {
+ String tableBuckets = "tbl1:10,tbl2 : 20, a.* :30,b.*:40,.*:50";
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ Map<String, Integer> tableBucketsMap =
databaseSync.getTableBuckets(tableBuckets);
+ assertEquals(10, tableBucketsMap.get("tbl1").intValue());
+ assertEquals(20, tableBucketsMap.get("tbl2").intValue());
+ assertEquals(30, tableBucketsMap.get("a.*").intValue());
+ assertEquals(40, tableBucketsMap.get("b.*").intValue());
+ assertEquals(50, tableBucketsMap.get(".*").intValue());
+ }
+
+ @Test
+ public void setTableSchemaBucketsTest() throws SQLException {
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ String tableSchemaBuckets =
"tbl1:10,tbl2:20,a11.*:30,a1.*:40,b.*:50,b1.*:60,.*:70";
+ Map<String, Integer> tableBucketsMap =
databaseSync.getTableBuckets(tableSchemaBuckets);
+ List<String> tableList =
+ Arrays.asList(
+ "tbl1", "tbl2", "tbl3", "a11", "a111", "a12", "a13",
"b1", "b11", "b2",
+ "c1", "d1");
+ HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets();
+ Set<String> tableSet = new HashSet<>();
+ tableList.forEach(
+ tableName -> {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setTable(tableName);
+ databaseSync.setTableSchemaBuckets(
+ tableBucketsMap, tableSchema, tableName, tableSet);
+ assertEquals(
+ matchedTableBucketsMap.get(tableName),
tableSchema.getTableBuckets());
+ });
+ }
+
+ @Test
+ public void setTableSchemaBucketsTest1() throws SQLException {
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ String tableSchemaBuckets = ".*:10,a.*:20,tbl:30,b.*:40";
+ Map<String, Integer> tableBucketsMap =
databaseSync.getTableBuckets(tableSchemaBuckets);
+ List<String> tableList = Arrays.asList("a1", "a2", "a3", "b1", "a");
+ HashMap<String, Integer> matchedTableBucketsMap = mockTableBuckets1();
+ Set<String> tableSet = new HashSet<>();
+ tableList.forEach(
+ tableName -> {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setTable(tableName);
+ databaseSync.setTableSchemaBuckets(
+ tableBucketsMap, tableSchema, tableName, tableSet);
+ assertEquals(
+ matchedTableBucketsMap.get(tableName),
tableSchema.getTableBuckets());
+ });
+ }
+
+ @NotNull
+ private static HashMap<String, Integer> mockTableBuckets() {
+ HashMap<String, Integer> matchedTableBucketsMap = new HashMap<>();
+ matchedTableBucketsMap.put("tbl1", 10);
+ matchedTableBucketsMap.put("tbl2", 20);
+ matchedTableBucketsMap.put("a11", 30);
+ matchedTableBucketsMap.put("a111", 30);
+ matchedTableBucketsMap.put("a12", 40);
+ matchedTableBucketsMap.put("a13", 40);
+ matchedTableBucketsMap.put("b1", 50);
+ matchedTableBucketsMap.put("b11", 50);
+ matchedTableBucketsMap.put("b2", 50);
+ matchedTableBucketsMap.put("c1", 70);
+ matchedTableBucketsMap.put("d1", 70);
+ matchedTableBucketsMap.put("tbl3", 70);
+ return matchedTableBucketsMap;
+ }
+
+ @NotNull
+ private static HashMap<String, Integer> mockTableBuckets1() {
+ HashMap<String, Integer> matchedTableBucketsMap = new HashMap<>();
+ matchedTableBucketsMap.put("a", 10);
+ matchedTableBucketsMap.put("a1", 10);
+ matchedTableBucketsMap.put("a2", 10);
+ matchedTableBucketsMap.put("a3", 10);
+ matchedTableBucketsMap.put("b1", 10);
+ matchedTableBucketsMap.put("tbl1", 10);
+ return matchedTableBucketsMap;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]