This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e49f7e4 [fix][cdc] fix uid conflicts during multi-database
synchronization. (#382)
e49f7e4 is described below
commit e49f7e49cebd561ba12b9a0d6e933248d6a43b92
Author: Petrichor <[email protected]>
AuthorDate: Mon May 13 14:07:15 2024 +0800
[fix][cdc] fix uid conflicts during multi-database synchronization. (#382)
---
.../apache/doris/flink/tools/cdc/DatabaseSync.java | 26 ++++++++++++++++++++--
1 file changed, 24 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 632edcc..a1f511a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -129,12 +129,14 @@ public abstract class DatabaseSync {
tableBucketsMap =
getTableBuckets(tableConfig.get("table-buckets"));
}
Set<String> bucketsTable = new HashSet<>();
+ Set<String> targetDbSet = new HashSet<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String targetDb = database;
// Synchronize multiple databases using the src database name
if (StringUtils.isNullOrWhitespaceOnly(targetDb)) {
targetDb = schema.getDatabaseName();
+ targetDbSet.add(targetDb);
}
if (StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(targetDb)) {
@@ -177,15 +179,35 @@ public abstract class DatabaseSync {
int sinkParallel =
sinkConfig.getInteger(
DorisConfigOptions.SINK_PARALLELISM,
sideOutput.getParallelism());
+ String uidName = getUidName(targetDbSet, dbTbl);
sideOutput
.sinkTo(buildDorisSink(dbTbl.f0 + "." + dbTbl.f1))
.setParallelism(sinkParallel)
- .name(dbTbl.f1)
- .uid(dbTbl.f1);
+ .name(uidName)
+ .uid(uidName);
}
}
}
+ /**
+ * @param targetDbSet The set of target databases.
+ * @param dbTbl The database-table tuple.
+ * @return The UID of the DataStream.
+ */
+ public String getUidName(Set<String> targetDbSet, Tuple2<String, String>
dbTbl) {
+ String uidName;
+ // Determine whether to proceed with multi-database synchronization.
+ // if yes, the UID is composed of `dbname_tablename`, otherwise it is
composed of
+ // `tablename`.
+ if (targetDbSet.size() > 1) {
+ uidName = dbTbl.f0 + "_" + dbTbl.f1;
+ } else {
+ uidName = dbTbl.f1;
+ }
+
+ return uidName;
+ }
+
private DorisConnectionOptions getDorisConnectionOptions() {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]