This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d61f3d2659 [flink] kafka_sync_database supports different prefix and 
suffix for different db (#4704)
d61f3d2659 is described below

commit d61f3d2659572d44fc028e5e16f957504ee1a7f8
Author: JackeyLee007 <[email protected]>
AuthorDate: Sun Dec 15 13:44:47 2024 +0800

    [flink] kafka_sync_database supports different prefix and suffix for 
different db (#4704)
---
 docs/content/cdc-ingestion/kafka-cdc.md            |  2 +
 .../shortcodes/generated/kafka_sync_database.html  | 12 +++++-
 .../flink/action/cdc/CdcActionCommonUtils.java     |  2 +
 .../flink/action/cdc/SyncDatabaseActionBase.java   | 34 +++++++++++++++-
 .../action/cdc/SyncDatabaseActionFactoryBase.java  |  4 ++
 .../flink/action/cdc/TableNameConverter.java       | 47 +++++++++++++++++++---
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  3 +-
 .../flink/action/cdc/TableNameConverterTest.java   | 42 +++++++++++++++++--
 pom.xml                                            |  1 +
 9 files changed, 133 insertions(+), 14 deletions(-)

diff --git a/docs/content/cdc-ingestion/kafka-cdc.md 
b/docs/content/cdc-ingestion/kafka-cdc.md
index b037937c55..26a5be3409 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -199,7 +199,9 @@ To use this feature through `flink run`, run the following 
shell command.
     --warehouse <warehouse-path> \
     --database <database-name> \
     [--table_mapping <table-name>=<paimon-table-name>] \
+    [--table_prefix_db <paimon-table-prefix-by-db>] \
     [--table_prefix <paimon-table-prefix>] \
+    [--table_suffix_db <paimon-table-suffix-by-db>] \
     [--table_suffix <paimon-table-suffix>] \
     [--including_tables <table-name|name-regular-expr>] \
     [--excluding_tables <table-name|name-regular-expr>] \
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 6c90f1d7f7..3664128a26 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -41,13 +41,21 @@ under the License.
         <td><h5>--table_mapping</h5></td>
         <td>The table name mapping between source database and Paimon. For 
example, if you want to synchronize a source table named "test" to a Paimon 
table named "paimon_test", you can specify "--table_mapping test=paimon_test". 
Multiple mappings could be specified with multiple "--table_mapping" options. 
"--table_mapping" has higher priority than "--table_prefix" and 
"--table_suffix".</td>
     </tr>
+    <tr>
+        <td><h5>--table_prefix_db</h5></td>
+        <td>The prefix of the Paimon tables to be synchronized from the 
specified db. For example, if you want to prefix the tables from db1 with 
"ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". 
"--table_prefix_db" has higher priority than "--table_prefix".</td>
+    </tr>
     <tr>
         <td><h5>--table_prefix</h5></td>
-        <td>The prefix of all Paimon tables to be synchronized. For example, 
if you want all synchronized tables to have "ods_" as prefix, you can specify 
"--table_prefix ods_".</td>
+        <td>The prefix of all Paimon tables to be synchronized except those 
specified by "--table_mapping" or "--table_prefix_db". For example, if you want 
all synchronized tables to have "ods_" as prefix, you can specify 
"--table_prefix ods_".</td>
+    </tr>
+    <tr>
+        <td><h5>--table_suffix_db</h5></td>
+        <td>The suffix of the Paimon tables to be synchronized from the 
specified db. The usage is same as "--table_prefix_db".</td>
     </tr>
     <tr>
         <td><h5>--table_suffix</h5></td>
-        <td>The suffix of all Paimon tables to be synchronized. The usage is 
same as "--table_prefix".</td>
+        <td>The suffix of all Paimon tables to be synchronized except those 
specified by "--table_mapping" or "--table_suffix_db". The usage is same as 
"--table_prefix".</td>
     </tr>
     <tr>
         <td><h5>--including_tables</h5></td>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 83891c90b8..c8af6f91c4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -56,6 +56,8 @@ public class CdcActionCommonUtils {
     public static final String PULSAR_CONF = "pulsar_conf";
     public static final String TABLE_PREFIX = "table_prefix";
     public static final String TABLE_SUFFIX = "table_suffix";
+    public static final String TABLE_PREFIX_DB = "table_prefix_db";
+    public static final String TABLE_SUFFIX_DB = "table_suffix_db";
     public static final String TABLE_MAPPING = "table_mapping";
     public static final String INCLUDING_TABLES = "including_tables";
     public static final String EXCLUDING_TABLES = "excluding_tables";
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index ac3483ac23..4fb1339c51 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -53,6 +53,8 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
     protected String tablePrefix = "";
     protected String tableSuffix = "";
     protected Map<String, String> tableMapping = new HashMap<>();
+    protected Map<String, String> dbPrefix = new HashMap<>();
+    protected Map<String, String> dbSuffix = new HashMap<>();
     protected String includingTables = ".*";
     protected List<String> partitionKeys = new ArrayList<>();
     protected List<String> primaryKeys = new ArrayList<>();
@@ -98,6 +100,30 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         return this;
     }
 
+    public SyncDatabaseActionBase withDbPrefix(Map<String, String> dbPrefix) {
+        if (dbPrefix != null) {
+            this.dbPrefix =
+                    dbPrefix.entrySet().stream()
+                            .collect(
+                                    HashMap::new,
+                                    (m, e) -> m.put(e.getKey().toLowerCase(), 
e.getValue()),
+                                    HashMap::putAll);
+        }
+        return this;
+    }
+
+    public SyncDatabaseActionBase withDbSuffix(Map<String, String> dbSuffix) {
+        if (dbSuffix != null) {
+            this.dbSuffix =
+                    dbSuffix.entrySet().stream()
+                            .collect(
+                                    HashMap::new,
+                                    (m, e) -> m.put(e.getKey().toLowerCase(), 
e.getValue()),
+                                    HashMap::putAll);
+        }
+        return this;
+    }
+
     public SyncDatabaseActionBase withTableMapping(Map<String, String> 
tableMapping) {
         if (tableMapping != null) {
             this.tableMapping = tableMapping;
@@ -164,7 +190,13 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
         TableNameConverter tableNameConverter =
                 new TableNameConverter(
-                        allowUpperCase, mergeShards, tablePrefix, tableSuffix, 
tableMapping);
+                        allowUpperCase,
+                        mergeShards,
+                        dbPrefix,
+                        dbSuffix,
+                        tablePrefix,
+                        tableSuffix,
+                        tableMapping);
         Set<String> createdTables;
         try {
             createdTables = new HashSet<>(catalog.listTables(database));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 2135f2a281..d497b588c2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -31,7 +31,9 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX_DB;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
 
 /** Base {@link ActionFactory} for synchronizing into database. */
@@ -52,6 +54,8 @@ public abstract class SyncDatabaseActionFactoryBase<T extends 
SyncDatabaseAction
     protected void withParams(MultipleParameterToolAdapter params, T action) {
         action.withTablePrefix(params.get(TABLE_PREFIX))
                 .withTableSuffix(params.get(TABLE_SUFFIX))
+                .withDbPrefix(optionalConfigMap(params, TABLE_PREFIX_DB))
+                .withDbSuffix(optionalConfigMap(params, TABLE_SUFFIX_DB))
                 .withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
                 .includingTables(params.get(INCLUDING_TABLES))
                 .excludingTables(params.get(EXCLUDING_TABLES))
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 4eca8b903e..15fc3507ce 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -31,6 +31,8 @@ public class TableNameConverter implements Serializable {
 
     private final boolean caseSensitive;
     private final boolean mergeShards;
+    private final Map<String, String> dbPrefix;
+    private final Map<String, String> dbSuffix;
     private final String prefix;
     private final String suffix;
     private final Map<String, String> tableMapping;
@@ -45,21 +47,54 @@ public class TableNameConverter implements Serializable {
             String prefix,
             String suffix,
             Map<String, String> tableMapping) {
+        this(
+                caseSensitive,
+                mergeShards,
+                new HashMap<>(),
+                new HashMap<>(),
+                prefix,
+                suffix,
+                tableMapping);
+    }
+
+    public TableNameConverter(
+            boolean caseSensitive,
+            boolean mergeShards,
+            Map<String, String> dbPrefix,
+            Map<String, String> dbSuffix,
+            String prefix,
+            String suffix,
+            Map<String, String> tableMapping) {
         this.caseSensitive = caseSensitive;
         this.mergeShards = mergeShards;
+        this.dbPrefix = dbPrefix;
+        this.dbSuffix = dbSuffix;
         this.prefix = prefix;
         this.suffix = suffix;
         this.tableMapping = lowerMapKey(tableMapping);
     }
 
-    public String convert(String originName) {
-        if (tableMapping.containsKey(originName.toLowerCase())) {
-            String mappedName = tableMapping.get(originName.toLowerCase());
+    public String convert(String originDbName, String originTblName) {
+        // top priority: table mapping
+        if (tableMapping.containsKey(originTblName.toLowerCase())) {
+            String mappedName = tableMapping.get(originTblName.toLowerCase());
             return caseSensitive ? mappedName : mappedName.toLowerCase();
         }
 
-        String tableName = caseSensitive ? originName : 
originName.toLowerCase();
-        return prefix + tableName + suffix;
+        String tblPrefix = prefix;
+        String tblSuffix = suffix;
+
+        // second priority: prefix and postfix specified by db
+        if (dbPrefix.containsKey(originDbName.toLowerCase())) {
+            tblPrefix = dbPrefix.get(originDbName.toLowerCase());
+        }
+        if (dbSuffix.containsKey(originDbName.toLowerCase())) {
+            tblSuffix = dbSuffix.get(originDbName.toLowerCase());
+        }
+
+        // third priority: normal prefix and suffix
+        String tableName = caseSensitive ? originTblName : 
originTblName.toLowerCase();
+        return tblPrefix + tableName + tblSuffix;
     }
 
     public String convert(Identifier originIdentifier) {
@@ -69,7 +104,7 @@ public class TableNameConverter implements Serializable {
                         : originIdentifier.getDatabaseName()
                                 + "_"
                                 + originIdentifier.getObjectName();
-        return convert(rawName);
+        return convert(originIdentifier.getDatabaseName(), rawName);
     }
 
     private Map<String, String> lowerMapKey(Map<String, String> map) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 235b3f9a32..ce2e9124a6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -143,7 +143,8 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
         for (JdbcTableInfo tableInfo : jdbcTableInfos) {
             Identifier identifier =
                     Identifier.create(
-                            database, 
tableNameConverter.convert(tableInfo.toPaimonTableName()));
+                            database,
+                            tableNameConverter.convert("", 
tableInfo.toPaimonTableName()));
             FileStoreTable table;
             Schema fromMySql =
                     CdcActionCommonUtils.buildPaimonSchema(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
index dfbe32e3d3..89bbadfeb8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
@@ -33,13 +33,47 @@ public class TableNameConverterTest {
         tableMapping.put("mapped_src", "mapped_TGT");
         TableNameConverter caseConverter =
                 new TableNameConverter(true, true, "pre_", "_pos", 
tableMapping);
-        Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT");
+        Assert.assertEquals(caseConverter.convert("", "mapped_SRC"), 
"mapped_TGT");
 
-        Assert.assertEquals(caseConverter.convert("unmapped_src"), 
"pre_unmapped_src_pos");
+        Assert.assertEquals(caseConverter.convert("", "unmapped_src"), 
"pre_unmapped_src_pos");
 
         TableNameConverter noCaseConverter =
                 new TableNameConverter(false, true, "pre_", "_pos", 
tableMapping);
-        Assert.assertEquals(noCaseConverter.convert("mapped_src"), 
"mapped_tgt");
-        Assert.assertEquals(noCaseConverter.convert("unmapped_src"), 
"pre_unmapped_src_pos");
+        Assert.assertEquals(noCaseConverter.convert("", "mapped_src"), 
"mapped_tgt");
+        Assert.assertEquals(noCaseConverter.convert("", "unmapped_src"), 
"pre_unmapped_src_pos");
+    }
+
+    @Test
+    public void testConvertTableNameByDBPrefix_Suffix() {
+        Map<String, String> dbPrefix = new HashMap<>(2);
+        dbPrefix.put("db_with_prefix", "db_pref_");
+        dbPrefix.put("db_with_prefix_suffix", "db_pref_");
+
+        Map<String, String> dbSuffix = new HashMap<>(2);
+        dbSuffix.put("db_with_suffix", "_db_suff");
+        dbSuffix.put("db_with_prefix_suffix", "_db_suff");
+
+        TableNameConverter tblNameConverter =
+                new TableNameConverter(false, true, dbPrefix, dbSuffix, 
"pre_", "_suf", null);
+
+        // Tables in the specified db should have the specified prefix and 
suffix.
+
+        // db prefix + normal suffix
+        Assert.assertEquals(
+                "db_pref_table_name_suf", 
tblNameConverter.convert("db_with_prefix", "table_name"));
+
+        // normal prefix + db suffix
+        Assert.assertEquals(
+                "pre_table_name_db_suff", 
tblNameConverter.convert("db_with_suffix", "table_name"));
+
+        // db prefix + db suffix
+        Assert.assertEquals(
+                "db_pref_table_name_db_suff",
+                tblNameConverter.convert("db_with_prefix_suffix", 
"table_name"));
+
+        // only normal prefix and suffix
+        Assert.assertEquals(
+                "pre_table_name_suf",
+                tblNameConverter.convert("db_without_prefix_suffix", 
"table_name"));
     }
 }
diff --git a/pom.xml b/pom.xml
index 904b1c73c7..dbef98af06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -529,6 +529,7 @@ under the License.
                         <exclude>release/**</exclude>
                         <!-- antlr grammar files -->
                         <exclude>paimon-common/src/main/antlr4/**</exclude>
+                        
<exclude>paimon-core/src/test/resources/compatibility/**</exclude>
                     </excludes>
                 </configuration>
             </plugin>

Reply via email to