This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d28a31030 [flink] kafka_sync_database supports db whitelist and 
blacklist (#4732)
0d28a31030 is described below

commit 0d28a310301a36213c9b469afc0b7ebcf36dbdae
Author: JackeyLee007 <[email protected]>
AuthorDate: Wed Dec 18 09:52:21 2024 +0800

    [flink] kafka_sync_database supports db whitelist and blacklist (#4732)
---
 docs/content/cdc-ingestion/kafka-cdc.md            |   6 +-
 .../shortcodes/generated/kafka_sync_database.html  |  26 ++--
 .../flink/action/cdc/CdcActionCommonUtils.java     |   2 +
 .../flink/action/cdc/SyncDatabaseActionBase.java   |  26 +++-
 .../action/cdc/SyncDatabaseActionFactoryBase.java  |   4 +
 .../cdc/RichCdcMultiplexRecordEventParser.java     |  67 +++++++--
 .../action/cdc/SyncDatabaseActionBaseTest.java     | 156 +++++++++++++++++++++
 7 files changed, 261 insertions(+), 26 deletions(-)

diff --git a/docs/content/cdc-ingestion/kafka-cdc.md 
b/docs/content/cdc-ingestion/kafka-cdc.md
index 26a5be3409..fc16c5b0fc 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -199,12 +199,14 @@ To use this feature through `flink run`, run the 
following shell command.
     --warehouse <warehouse-path> \
     --database <database-name> \
     [--table_mapping <table-name>=<paimon-table-name>] \
-    [--table_prefix_db <paimon-table-prefix-by-db>] \
     [--table_prefix <paimon-table-prefix>] \
-    [--table_suffix_db <paimon-table-suffix-by-db>] \
     [--table_suffix <paimon-table-suffix>] \
+    [--table_prefix_db <paimon-table-prefix-by-db>] \
+    [--table_suffix_db <paimon-table-suffix-by-db>] \
     [--including_tables <table-name|name-regular-expr>] \
     [--excluding_tables <table-name|name-regular-expr>] \
+    [--including_dbs <database-name|name-regular-expr>] \
+    [--excluding_dbs <database-name|name-regular-expr>] \
     [--type_mapping to-string] \
     [--partition_keys <partition_keys>] \
     [--primary_keys <primary-keys>] \
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 3664128a26..9f0b817e66 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -41,22 +41,22 @@ under the License.
         <td><h5>--table_mapping</h5></td>
         <td>The table name mapping between source database and Paimon. For 
example, if you want to synchronize a source table named "test" to a Paimon 
table named "paimon_test", you can specify "--table_mapping test=paimon_test". 
Multiple mappings could be specified with multiple "--table_mapping" options. 
"--table_mapping" has higher priority than "--table_prefix" and 
"--table_suffix".</td>
     </tr>
-    <tr>
-        <td><h5>--table_prefix_db</h5></td>
-        <td>The prefix of the Paimon tables to be synchronized from the 
specified db. For example, if you want to prefix the tables from db1 with 
"ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". 
"--table_prefix_db" has higher priority than "--table_prefix".</td>
-    </tr>
     <tr>
         <td><h5>--table_prefix</h5></td>
         <td>The prefix of all Paimon tables to be synchronized except those 
specified by "--table_mapping" or "--table_prefix_db". For example, if you want 
all synchronized tables to have "ods_" as prefix, you can specify 
"--table_prefix ods_".</td>
     </tr>
-    <tr>
-        <td><h5>--table_suffix_db</h5></td>
-        <td>The suffix of the Paimon tables to be synchronized from the 
specified db. The usage is same as "--table_prefix_db".</td>
-    </tr>
     <tr>
         <td><h5>--table_suffix</h5></td>
         <td>The suffix of all Paimon tables to be synchronized except those 
specified by "--table_mapping" or "--table_suffix_db". The usage is same as 
"--table_prefix".</td>
     </tr>
+    <tr>
+        <td><h5>--table_prefix_db</h5></td>
+        <td>The prefix of the Paimon tables to be synchronized from the 
specified db. For example, if you want to prefix the tables from db1 with 
"ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". 
"--table_prefix_db" has higher priority than "--table_prefix".</td>
+    </tr>
+    <tr>
+        <td><h5>--table_suffix_db</h5></td>
+        <td>The suffix of the Paimon tables to be synchronized from the 
specified db. The usage is same as "--table_prefix_db".</td>
+    </tr>
     <tr>
         <td><h5>--including_tables</h5></td>
         <td>It is used to specify which source tables are to be synchronized. 
You must use '|' to separate multiple tables.Because '|' is a special 
character, a comma is required, for example: 'a|b|c'.Regular expression is 
supported, for example, specifying "--including_tables test|paimon.*" means to 
synchronize table 'test' and all tables start with 'paimon'.</td>
@@ -65,6 +65,14 @@ under the License.
         <td><h5>--excluding_tables</h5></td>
         <td>It is used to specify which source tables are not to be 
synchronized. The usage is same as "--including_tables". "--excluding_tables" 
has higher priority than "--including_tables" if you specified both.</td>
     </tr>
+    <tr>
+        <td><h5>--including_dbs</h5></td>
+        <td>It is used to specify the databases within which the tables are to 
be synchronized. The usage is same as "--including_tables".</td>
+    </tr>
+    <tr>
+        <td><h5>--excluding_dbs</h5></td>
+        <td>It is used to specify the databases within which the tables are 
not to be synchronized. The usage is same as "--excluding_tables". 
"--excluding_dbs" has higher priority than "--including_dbs" if you specified 
both.</td>
+    </tr>
     <tr>
         <td><h5>--type_mapping</h5></td>
         <td>It is used to specify how to map MySQL data type to Paimon 
type.<br />
@@ -114,4 +122,4 @@ under the License.
         <td>The configuration for Paimon table sink. Each configuration should 
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a 
complete list of table configurations.</td>
     </tr>
     </tbody>
-</table>
\ No newline at end of file
+</table>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 6482a625f4..c107500eba 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
     public static final String TABLE_MAPPING = "table_mapping";
     public static final String INCLUDING_TABLES = "including_tables";
     public static final String EXCLUDING_TABLES = "excluding_tables";
+    public static final String INCLUDING_DBS = "including_dbs";
+    public static final String EXCLUDING_DBS = "excluding_dbs";
     public static final String TYPE_MAPPING = "type_mapping";
     public static final String PARTITION_KEYS = "partition_keys";
     public static final String PRIMARY_KEYS = "primary_keys";
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 56334c1e7b..63e29d6a0e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -60,6 +60,8 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
     protected List<String> partitionKeys = new ArrayList<>();
     protected List<String> primaryKeys = new ArrayList<>();
     @Nullable protected String excludingTables;
+    protected String includingDbs = ".*";
+    @Nullable protected String excludingDbs;
     protected List<FileStoreTable> tables = new ArrayList<>();
     protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();
 
@@ -144,6 +146,18 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         return this;
     }
 
+    public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
+        if (includingDbs != null) {
+            this.includingDbs = includingDbs;
+        }
+        return this;
+    }
+
+    public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
+        this.excludingDbs = excludingDbs;
+        return this;
+    }
+
     public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
         this.partitionKeys.addAll(Arrays.asList(partitionKeys));
         return this;
@@ -186,9 +200,11 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                         requirePrimaryKeys(),
                         partitionKeyMultiple,
                         metadataConverters);
-        Pattern includingPattern = Pattern.compile(includingTables);
-        Pattern excludingPattern =
+        Pattern tblIncludingPattern = Pattern.compile(includingTables);
+        Pattern tblExcludingPattern =
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        Pattern dbIncludingPattern = Pattern.compile(includingDbs);
+        Pattern dbExcludingPattern = excludingDbs == null ? null : 
Pattern.compile(excludingDbs);
         TableNameConverter tableNameConverter =
                 new TableNameConverter(
                         caseSensitive,
@@ -207,8 +223,10 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         return () ->
                 new RichCdcMultiplexRecordEventParser(
                         schemaBuilder,
-                        includingPattern,
-                        excludingPattern,
+                        tblIncludingPattern,
+                        tblExcludingPattern,
+                        dbIncludingPattern,
+                        dbExcludingPattern,
                         tableNameConverter,
                         createdTables);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index d497b588c2..c82039a9a0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -24,7 +24,9 @@ import 
org.apache.paimon.flink.action.MultipleParameterToolAdapter;
 
 import java.util.Optional;
 
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
@@ -59,6 +61,8 @@ public abstract class SyncDatabaseActionFactoryBase<T extends 
SyncDatabaseAction
                 .withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
                 .includingTables(params.get(INCLUDING_TABLES))
                 .excludingTables(params.get(EXCLUDING_TABLES))
+                .includingDbs(params.get(INCLUDING_DBS))
+                .excludingDbs(params.get(EXCLUDING_DBS))
                 .withPartitionKeyMultiple(
                         optionalConfigMapList(params, 
MULTIPLE_TABLE_PARTITION_KEYS))
                 .withPartitionKeys();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 939410bf46..47367c4234 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -46,8 +46,10 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
             LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);
 
     @Nullable private final NewTableSchemaBuilder schemaBuilder;
-    @Nullable private final Pattern includingPattern;
-    @Nullable private final Pattern excludingPattern;
+    @Nullable private final Pattern tblIncludingPattern;
+    @Nullable private final Pattern tblExcludingPattern;
+    @Nullable private final Pattern dbIncludingPattern;
+    @Nullable private final Pattern dbExcludingPattern;
     private final TableNameConverter tableNameConverter;
     private final Set<String> createdTables;
 
@@ -55,24 +57,32 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     private final Set<String> includedTables = new HashSet<>();
     private final Set<String> excludedTables = new HashSet<>();
 
+    private final Set<String> includedDbs = new HashSet<>();
+    private final Set<String> excludedDbs = new HashSet<>();
+
     private RichCdcMultiplexRecord record;
     private String currentTable;
+    private String currentDb;
     private boolean shouldSynchronizeCurrentTable;
     private RichEventParser currentParser;
 
     public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
-        this(null, null, null, new TableNameConverter(caseSensitive), new 
HashSet<>());
+        this(null, null, null, null, null, new 
TableNameConverter(caseSensitive), new HashSet<>());
     }
 
     public RichCdcMultiplexRecordEventParser(
             @Nullable NewTableSchemaBuilder schemaBuilder,
-            @Nullable Pattern includingPattern,
-            @Nullable Pattern excludingPattern,
+            @Nullable Pattern tblIncludingPattern,
+            @Nullable Pattern tblExcludingPattern,
+            @Nullable Pattern dbIncludingPattern,
+            @Nullable Pattern dbExcludingPattern,
             TableNameConverter tableNameConverter,
             Set<String> createdTables) {
         this.schemaBuilder = schemaBuilder;
-        this.includingPattern = includingPattern;
-        this.excludingPattern = excludingPattern;
+        this.tblIncludingPattern = tblIncludingPattern;
+        this.tblExcludingPattern = tblExcludingPattern;
+        this.dbIncludingPattern = dbIncludingPattern;
+        this.dbExcludingPattern = dbExcludingPattern;
         this.tableNameConverter = tableNameConverter;
         this.createdTables = createdTables;
     }
@@ -81,6 +91,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     public void setRawEvent(RichCdcMultiplexRecord record) {
         this.record = record;
         this.currentTable = record.tableName();
+        this.currentDb = record.databaseName();
         this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
         if (shouldSynchronizeCurrentTable) {
             this.currentParser = parsers.computeIfAbsent(currentTable, t -> 
new RichEventParser());
@@ -124,7 +135,41 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
         return Optional.empty();
     }
 
+    private boolean shouldSynchronizeCurrentDb() {
+        // In case the record is incomplete, we let the null value pass 
validation
+        // and handle the null value when we really need it
+        if (currentDb == null) {
+            return true;
+        }
+        if (includedDbs.contains(currentDb)) {
+            return true;
+        }
+        if (excludedDbs.contains(currentDb)) {
+            return false;
+        }
+        boolean shouldSynchronize = true;
+        if (dbIncludingPattern != null) {
+            shouldSynchronize = 
dbIncludingPattern.matcher(currentDb).matches();
+        }
+        if (dbExcludingPattern != null) {
+            shouldSynchronize =
+                    shouldSynchronize && 
!dbExcludingPattern.matcher(currentDb).matches();
+        }
+        if (!shouldSynchronize) {
+            LOG.debug(
+                    "Source database {} won't be synchronized because it was 
excluded. ",
+                    currentDb);
+            excludedDbs.add(currentDb);
+            return false;
+        }
+        includedDbs.add(currentDb);
+        return true;
+    }
+
     private boolean shouldSynchronizeCurrentTable() {
+        if (!shouldSynchronizeCurrentDb()) {
+            return false;
+        }
         // In case the record is incomplete, we let the null value pass 
validation
         // and handle the null value when we really need it
         if (currentTable == null) {
@@ -139,12 +184,12 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
         }
 
         boolean shouldSynchronize = true;
-        if (includingPattern != null) {
-            shouldSynchronize = 
includingPattern.matcher(currentTable).matches();
+        if (tblIncludingPattern != null) {
+            shouldSynchronize = 
tblIncludingPattern.matcher(currentTable).matches();
         }
-        if (excludingPattern != null) {
+        if (tblExcludingPattern != null) {
             shouldSynchronize =
-                    shouldSynchronize && 
!excludingPattern.matcher(currentTable).matches();
+                    shouldSynchronize && 
!tblExcludingPattern.matcher(currentTable).matches();
         }
         if (!shouldSynchronize) {
             LOG.debug(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
new file mode 100644
index 0000000000..5247225caf
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Tests for {@link SyncDatabaseActionBase}. */
+public class SyncDatabaseActionBaseTest {
+    private static final String ANY_DB = "any_db";
+    private static final String WHITE_DB = "white_db";
+    private static final String BLACK_DB = "black_db";
+    private static final String WHITE_TBL = "white_tbl";
+    private static final String BLACK_TBL = "black_tbl";
+
+    private SyncDatabaseActionBase kafkaSyncDbAction;
+    private RichCdcMultiplexRecord whiteAnyDbCdcRecord;
+    private RichCdcMultiplexRecord blackAnyDbCdcRecord;
+    private RichCdcMultiplexRecord whiteCdcRecord;
+    private RichCdcMultiplexRecord blackCdcRecord;
+    private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord;
+    private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord;
+
+    @TempDir private java.nio.file.Path tmp;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        LocalFileIO localFileIO = new LocalFileIO();
+        Path defaultDb = new Path(tmp.toString(), "default.db");
+        localFileIO.mkdirs(defaultDb);
+
+        kafkaSyncDbAction =
+                new KafkaSyncDatabaseAction(
+                        tmp.toString(), "default", new HashMap<>(), new 
HashMap<>());
+
+        Map<String, String> rawData = new HashMap<>();
+        rawData.put("field", "value");
+
+        CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
+        whiteAnyDbCdcRecord =
+                new RichCdcMultiplexRecord(
+                        ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+        blackAnyDbCdcRecord =
+                new RichCdcMultiplexRecord(
+                        ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+        whiteCdcRecord =
+                new RichCdcMultiplexRecord(
+                        WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+        blackCdcRecord =
+                new RichCdcMultiplexRecord(
+                        BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+
+        whiteDbBlackTblCdcRecord =
+                new RichCdcMultiplexRecord(
+                        WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+        blackDbWhiteTblCdcRecord =
+                new RichCdcMultiplexRecord(
+                        BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+    }
+
+    @Test
+    public void testSyncTablesWithoutDbLists() throws NoSuchMethodException, 
IOException {
+
+        kafkaSyncDbAction.includingTables(WHITE_TBL);
+        kafkaSyncDbAction.excludingTables(BLACK_TBL);
+
+        RichCdcMultiplexRecordEventParser parser =
+                (RichCdcMultiplexRecordEventParser)
+                        kafkaSyncDbAction.buildEventParserFactory().create();
+        List<CdcRecord> parsedRecords;
+
+        parser.setRawEvent(whiteAnyDbCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(1, parsedRecords.size());
+
+        parser.setRawEvent(blackAnyDbCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(0, parsedRecords.size());
+    }
+
+    @Test
+    public void testSyncTablesWithDbList() {
+        kafkaSyncDbAction.includingDbs(WHITE_DB);
+        kafkaSyncDbAction.excludingDbs(BLACK_DB);
+        RichCdcMultiplexRecordEventParser parser =
+                (RichCdcMultiplexRecordEventParser)
+                        kafkaSyncDbAction.buildEventParserFactory().create();
+        List<CdcRecord> parsedRecords;
+
+        parser.setRawEvent(whiteAnyDbCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(0, parsedRecords.size());
+
+        parser.setRawEvent(blackAnyDbCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(0, parsedRecords.size());
+
+        // white db and white table
+        parser.setRawEvent(whiteCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(1, parsedRecords.size());
+
+        parser.setRawEvent(blackAnyDbCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(0, parsedRecords.size());
+    }
+
+    @Test
+    public void testSycTablesCrossDB() {
+        kafkaSyncDbAction.includingDbs(WHITE_DB);
+        kafkaSyncDbAction.excludingDbs(BLACK_DB);
+        kafkaSyncDbAction.excludingTables(BLACK_TBL);
+        RichCdcMultiplexRecordEventParser parser =
+                (RichCdcMultiplexRecordEventParser)
+                        kafkaSyncDbAction.buildEventParserFactory().create();
+        List<CdcRecord> parsedRecords;
+        parser.setRawEvent(whiteDbBlackTblCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(0, parsedRecords.size());
+        parser.setRawEvent(blackDbWhiteTblCdcRecord);
+        parsedRecords = parser.parseRecords();
+        Assert.assertEquals(0, parsedRecords.size());
+    }
+}

Reply via email to