This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 0611b93d [Improvment](cdc) Use uniq index as primary key (#516)
0611b93d is described below
commit 0611b93d0218c6ba9d547c50a23f910a0bbe1cf9
Author: wudi <[email protected]>
AuthorDate: Mon Nov 25 10:45:51 2024 +0800
[Improvment](cdc) Use uniq index as primary key (#516)
---
.../doris/flink/tools/cdc/JdbcSourceSchema.java | 33 ++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
index 31cfd1cb..2547b976 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java
@@ -27,8 +27,10 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
/**
* JdbcSourceSchema is a subclass of SourceSchema, used to build metadata
about jdbc-related
@@ -47,6 +49,10 @@ public abstract class JdbcSourceSchema extends SourceSchema {
super(databaseName, schemaName, tableName, tableComment);
fields = getColumnInfo(metaData, databaseName, schemaName, tableName);
primaryKeys = getPrimaryKeys(metaData, databaseName, schemaName,
tableName);
+ if (primaryKeys.isEmpty()) {
+ List<String> uniqIndex = getUniqIndex(metaData, databaseName,
schemaName, tableName);
+ primaryKeys.addAll(uniqIndex);
+ }
}
public LinkedHashMap<String, FieldSchema> getColumnInfo(
@@ -96,5 +102,32 @@ public abstract class JdbcSourceSchema extends SourceSchema
{
return primaryKeys;
}
+ /**
+ * Get the unique index of the table If the primary key is empty but there
is a uniq key, then
+ * use the uniqkey instead of the primarykey
+ */
+ public List<String> getUniqIndex(
+ DatabaseMetaData metaData, String databaseName, String schemaName,
String tableName)
+ throws SQLException {
+ Map<String, List<String>> uniqIndexMap = new HashMap<>();
+ String firstIndexName = null;
+ try (ResultSet rs =
+ metaData.getIndexInfo(databaseName, schemaName, tableName,
true, true)) {
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ String indexName = rs.getString("INDEX_NAME");
+ if (firstIndexName == null) {
+ firstIndexName = indexName;
+ }
+ uniqIndexMap.computeIfAbsent(indexName, k -> new
ArrayList<>()).add(columnName);
+ }
+ }
+ if (!uniqIndexMap.isEmpty()) {
+ // If there are multiple uniq indices, return one
+ return uniqIndexMap.get(firstIndexName);
+ }
+ return new ArrayList<>();
+ }
+
public abstract String convertToDorisType(String fieldType, Integer
precision, Integer scale);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]