This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b7b5802f [improve](cdc) add config for uniq index to primary key with
create table (#524)
b7b5802f is described below
commit b7b5802fae187705a53e2793ca2fd0c211550d28
Author: wudi <[email protected]>
AuthorDate: Fri Dec 6 10:41:39 2024 +0800
[improve](cdc) add config for uniq index to primary key with create table
(#524)
---
.../java/org/apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +++++
.../org/apache/doris/flink/tools/cdc/DorisTableConfig.java | 12 ++++++++++++
.../org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java | 5 +----
.../java/org/apache/doris/flink/tools/cdc/SourceSchema.java | 6 +++++-
.../container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt | 3 ++-
5 files changed, 25 insertions(+), 6 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index dea4422c..701bd967 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -463,6 +463,11 @@ public abstract class DatabaseSync {
private void tryCreateTableIfAbsent(
DorisSystem dorisSystem, String targetDb, String dorisTable,
SourceSchema schema) {
if (!dorisSystem.tableExists(targetDb, dorisTable)) {
+ if (dorisTableConfig.isConvertUniqToPk()
+ && CollectionUtil.isNullOrEmpty(schema.primaryKeys)
+ && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
+ schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
+ }
TableSchema dorisSchema =
DorisSchemaFactory.createTableSchema(
database,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
index 6f5d929e..6014f249 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DorisTableConfig.java
@@ -32,6 +32,7 @@ public class DorisTableConfig implements Serializable {
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";
public static final String TABLE_PARTITIONS = "table-partitions";
+ public static final String CONVERT_UNIQ_TO_PK = "convert-uniq-to-pk";
private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed
and integrated into the
@@ -39,6 +40,8 @@ public class DorisTableConfig implements Serializable {
private Map<String, Integer> tableBuckets;
// table:partitionColumn:interval
private Map<String, Tuple2<String, String>> tablePartitions;
+ // uniq index to primary key
+ private boolean convertUniqToPk = false;
// Only for testing
@VisibleForTesting
@@ -64,6 +67,11 @@ public class DorisTableConfig implements Serializable {
tableConfig.remove(TABLE_PARTITIONS);
}
+ if (tableConfig.containsKey(CONVERT_UNIQ_TO_PK)) {
+ this.convertUniqToPk =
Boolean.parseBoolean(tableConfig.get(CONVERT_UNIQ_TO_PK));
+ tableConfig.remove(CONVERT_UNIQ_TO_PK);
+ }
+
tableProperties = tableConfig;
}
@@ -79,6 +87,10 @@ public class DorisTableConfig implements Serializable {
return tablePartitions;
}
+ public boolean isConvertUniqToPk() {
+ return convertUniqToPk;
+ }
+
/**
* Build table bucket Map.
*
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 2547b976..aa64037b 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -49,10 +49,7 @@ public abstract class JdbcSourceSchema extends SourceSchema {
super(databaseName, schemaName, tableName, tableComment);
fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName,
tableName);
- if (primaryKeys.isEmpty()) {
- List<String> uniqIndex = getUniqIndex(metaData, databaseName,
schemaName, tableName);
- primaryKeys.addAll(uniqIndex);
- }
+ uniqueIndexs = getUniqIndex(metaData, databaseName, schemaName,
tableName);
}
public LinkedHashMap<String, FieldSchema> getColumnInfo(
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
index de3e7975..aed1754c 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java
@@ -37,6 +37,7 @@ public abstract class SourceSchema {
protected final String tableComment;
protected LinkedHashMap<String, FieldSchema> fields;
public List<String> primaryKeys;
+ public List<String> uniqueIndexs;
public DataModel model = DataModel.UNIQUE;
public SourceSchema(
@@ -64,7 +65,6 @@ public abstract class SourceSchema {
if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) {
identifier.add(schemaName);
}
-
if (!StringUtils.isNullOrWhitespaceOnly(tableName)) {
identifier.add(tableName);
}
@@ -115,6 +115,10 @@ public abstract class SourceSchema {
return primaryKeys;
}
+ public List<String> getUniqueIndexs() {
+ return uniqueIndexs;
+ }
+
public String getTableComment() {
return tableComment;
}
diff --git
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
index 632c3735..053dc9ef 100644
---
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
+++
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
@@ -2,4 +2,5 @@ mysql-sync-database
--including-tables "create_tbl_.*"
--create-table-only
--table-conf
table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
- --table-conf replication_num=1
\ No newline at end of file
+ --table-conf replication_num=1
+ --table-conf convert-uniq-to-pk=true
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]