This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0d28a31030 [flink] kafka_sync_database supports db whitelist and
blacklist (#4732)
0d28a31030 is described below
commit 0d28a310301a36213c9b469afc0b7ebcf36dbdae
Author: JackeyLee007 <[email protected]>
AuthorDate: Wed Dec 18 09:52:21 2024 +0800
[flink] kafka_sync_database supports db whitelist and blacklist (#4732)
---
docs/content/cdc-ingestion/kafka-cdc.md | 6 +-
.../shortcodes/generated/kafka_sync_database.html | 26 ++--
.../flink/action/cdc/CdcActionCommonUtils.java | 2 +
.../flink/action/cdc/SyncDatabaseActionBase.java | 26 +++-
.../action/cdc/SyncDatabaseActionFactoryBase.java | 4 +
.../cdc/RichCdcMultiplexRecordEventParser.java | 67 +++++++--
.../action/cdc/SyncDatabaseActionBaseTest.java | 156 +++++++++++++++++++++
7 files changed, 261 insertions(+), 26 deletions(-)
diff --git a/docs/content/cdc-ingestion/kafka-cdc.md
b/docs/content/cdc-ingestion/kafka-cdc.md
index 26a5be3409..fc16c5b0fc 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -199,12 +199,14 @@ To use this feature through `flink run`, run the
following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
- [--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
- [--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
+ [--table_prefix_db <paimon-table-prefix-by-db>] \
+ [--table_suffix_db <paimon-table-suffix-by-db>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
+ [--including_dbs <database-name|name-regular-expr>] \
+ [--excluding_dbs <database-name|name-regular-expr>] \
[--type_mapping to-string] \
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 3664128a26..9f0b817e66 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -41,22 +41,22 @@ under the License.
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For
example, if you want to synchronize a source table named "test" to a Paimon
table named "paimon_test", you can specify "--table_mapping test=paimon_test".
Multiple mappings could be specified with multiple "--table_mapping" options.
"--table_mapping" has higher priority than "--table_prefix" and
"--table_suffix".</td>
</tr>
- <tr>
- <td><h5>--table_prefix_db</h5></td>
- <td>The prefix of the Paimon tables to be synchronized from the
specified db. For example, if you want to prefix the tables from db1 with
"ods_db1_", you can specify "--table_prefix_db db1=ods_db1_".
"--table_prefix_db" has higher priority than "--table_prefix".</td>
- </tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized except those
specified by "--table_mapping" or "--table_prefix_db". For example, if you want
all synchronized tables to have "ods_" as prefix, you can specify
"--table_prefix ods_".</td>
</tr>
- <tr>
- <td><h5>--table_suffix_db</h5></td>
- <td>The suffix of the Paimon tables to be synchronized from the
specified db. The usage is same as "--table_prefix_db".</td>
- </tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized except those
specified by "--table_mapping" or "--table_suffix_db". The usage is same as
"--table_prefix".</td>
</tr>
+ <tr>
+ <td><h5>--table_prefix_db</h5></td>
+ <td>The prefix of the Paimon tables to be synchronized from the
specified db. For example, if you want to prefix the tables from db1 with
"ods_db1_", you can specify "--table_prefix_db db1=ods_db1_".
"--table_prefix_db" has higher priority than "--table_prefix".</td>
+ </tr>
+ <tr>
+ <td><h5>--table_suffix_db</h5></td>
+ <td>The suffix of the Paimon tables to be synchronized from the
specified db. The usage is same as "--table_prefix_db".</td>
+ </tr>
<tr>
<td><h5>--including_tables</h5></td>
<td>It is used to specify which source tables are to be synchronized.
You must use '|' to separate multiple tables.Because '|' is a special
character, a comma is required, for example: 'a|b|c'.Regular expression is
supported, for example, specifying "--including_tables test|paimon.*" means to
synchronize table 'test' and all tables start with 'paimon'.</td>
@@ -65,6 +65,14 @@ under the License.
<td><h5>--excluding_tables</h5></td>
<td>It is used to specify which source tables are not to be
synchronized. The usage is same as "--including_tables". "--excluding_tables"
has higher priority than "--including_tables" if you specified both.</td>
</tr>
+ <tr>
+ <td><h5>--including_dbs</h5></td>
+ <td>It is used to specify the databases within which the tables are to
be synchronized. The usage is same as "--including_tables".</td>
+ </tr>
+ <tr>
+ <td><h5>--excluding_dbs</h5></td>
+ <td>It is used to specify the databases within which the tables are
not to be synchronized. The usage is same as "--excluding_tables".
"--excluding_dbs" has higher priority than "--including_dbs" if you specified
both.</td>
+ </tr>
<tr>
<td><h5>--type_mapping</h5></td>
<td>It is used to specify how to map MySQL data type to Paimon
type.<br />
@@ -114,4 +122,4 @@ under the License.
<td>The configuration for Paimon table sink. Each configuration should
be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a
complete list of table configurations.</td>
</tr>
</tbody>
-</table>
\ No newline at end of file
+</table>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 6482a625f4..c107500eba 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -61,6 +61,8 @@ public class CdcActionCommonUtils {
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
+ public static final String INCLUDING_DBS = "including_dbs";
+ public static final String EXCLUDING_DBS = "excluding_dbs";
public static final String TYPE_MAPPING = "type_mapping";
public static final String PARTITION_KEYS = "partition_keys";
public static final String PRIMARY_KEYS = "primary_keys";
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 56334c1e7b..63e29d6a0e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -60,6 +60,8 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
@Nullable protected String excludingTables;
+ protected String includingDbs = ".*";
+ @Nullable protected String excludingDbs;
protected List<FileStoreTable> tables = new ArrayList<>();
protected Map<String, List<String>> partitionKeyMultiple = new HashMap<>();
@@ -144,6 +146,18 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
return this;
}
+ public SyncDatabaseActionBase includingDbs(@Nullable String includingDbs) {
+ if (includingDbs != null) {
+ this.includingDbs = includingDbs;
+ }
+ return this;
+ }
+
+ public SyncDatabaseActionBase excludingDbs(@Nullable String excludingDbs) {
+ this.excludingDbs = excludingDbs;
+ return this;
+ }
+
public SyncDatabaseActionBase withPartitionKeys(String... partitionKeys) {
this.partitionKeys.addAll(Arrays.asList(partitionKeys));
return this;
@@ -186,9 +200,11 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
requirePrimaryKeys(),
partitionKeyMultiple,
metadataConverters);
- Pattern includingPattern = Pattern.compile(includingTables);
- Pattern excludingPattern =
+ Pattern tblIncludingPattern = Pattern.compile(includingTables);
+ Pattern tblExcludingPattern =
excludingTables == null ? null :
Pattern.compile(excludingTables);
+ Pattern dbIncludingPattern = Pattern.compile(includingDbs);
+ Pattern dbExcludingPattern = excludingDbs == null ? null :
Pattern.compile(excludingDbs);
TableNameConverter tableNameConverter =
new TableNameConverter(
caseSensitive,
@@ -207,8 +223,10 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
return () ->
new RichCdcMultiplexRecordEventParser(
schemaBuilder,
- includingPattern,
- excludingPattern,
+ tblIncludingPattern,
+ tblExcludingPattern,
+ dbIncludingPattern,
+ dbExcludingPattern,
tableNameConverter,
createdTables);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index d497b588c2..c82039a9a0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -24,7 +24,9 @@ import
org.apache.paimon.flink.action.MultipleParameterToolAdapter;
import java.util.Optional;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_DBS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_TABLES;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
@@ -59,6 +61,8 @@ public abstract class SyncDatabaseActionFactoryBase<T extends
SyncDatabaseAction
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
+ .includingDbs(params.get(INCLUDING_DBS))
+ .excludingDbs(params.get(EXCLUDING_DBS))
.withPartitionKeyMultiple(
optionalConfigMapList(params,
MULTIPLE_TABLE_PARTITION_KEYS))
.withPartitionKeys();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 939410bf46..47367c4234 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -46,8 +46,10 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);
@Nullable private final NewTableSchemaBuilder schemaBuilder;
- @Nullable private final Pattern includingPattern;
- @Nullable private final Pattern excludingPattern;
+ @Nullable private final Pattern tblIncludingPattern;
+ @Nullable private final Pattern tblExcludingPattern;
+ @Nullable private final Pattern dbIncludingPattern;
+ @Nullable private final Pattern dbExcludingPattern;
private final TableNameConverter tableNameConverter;
private final Set<String> createdTables;
@@ -55,24 +57,32 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
+ private final Set<String> includedDbs = new HashSet<>();
+ private final Set<String> excludedDbs = new HashSet<>();
+
private RichCdcMultiplexRecord record;
private String currentTable;
+ private String currentDb;
private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;
public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
- this(null, null, null, new TableNameConverter(caseSensitive), new
HashSet<>());
+ this(null, null, null, null, null, new
TableNameConverter(caseSensitive), new HashSet<>());
}
public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
- @Nullable Pattern includingPattern,
- @Nullable Pattern excludingPattern,
+ @Nullable Pattern tblIncludingPattern,
+ @Nullable Pattern tblExcludingPattern,
+ @Nullable Pattern dbIncludingPattern,
+ @Nullable Pattern dbExcludingPattern,
TableNameConverter tableNameConverter,
Set<String> createdTables) {
this.schemaBuilder = schemaBuilder;
- this.includingPattern = includingPattern;
- this.excludingPattern = excludingPattern;
+ this.tblIncludingPattern = tblIncludingPattern;
+ this.tblExcludingPattern = tblExcludingPattern;
+ this.dbIncludingPattern = dbIncludingPattern;
+ this.dbExcludingPattern = dbExcludingPattern;
this.tableNameConverter = tableNameConverter;
this.createdTables = createdTables;
}
@@ -81,6 +91,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
public void setRawEvent(RichCdcMultiplexRecord record) {
this.record = record;
this.currentTable = record.tableName();
+ this.currentDb = record.databaseName();
this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
if (shouldSynchronizeCurrentTable) {
this.currentParser = parsers.computeIfAbsent(currentTable, t ->
new RichEventParser());
@@ -124,7 +135,41 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
return Optional.empty();
}
+ private boolean shouldSynchronizeCurrentDb() {
+ // In case the record is incomplete, we let the null value pass
validation
+ // and handle the null value when we really need it
+ if (currentDb == null) {
+ return true;
+ }
+ if (includedDbs.contains(currentDb)) {
+ return true;
+ }
+ if (excludedDbs.contains(currentDb)) {
+ return false;
+ }
+ boolean shouldSynchronize = true;
+ if (dbIncludingPattern != null) {
+ shouldSynchronize =
dbIncludingPattern.matcher(currentDb).matches();
+ }
+ if (dbExcludingPattern != null) {
+ shouldSynchronize =
+ shouldSynchronize &&
!dbExcludingPattern.matcher(currentDb).matches();
+ }
+ if (!shouldSynchronize) {
+ LOG.debug(
+ "Source database {} won't be synchronized because it was
excluded. ",
+ currentDb);
+ excludedDbs.add(currentDb);
+ return false;
+ }
+ includedDbs.add(currentDb);
+ return true;
+ }
+
private boolean shouldSynchronizeCurrentTable() {
+ if (!shouldSynchronizeCurrentDb()) {
+ return false;
+ }
// In case the record is incomplete, we let the null value pass
validation
// and handle the null value when we really need it
if (currentTable == null) {
@@ -139,12 +184,12 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
}
boolean shouldSynchronize = true;
- if (includingPattern != null) {
- shouldSynchronize =
includingPattern.matcher(currentTable).matches();
+ if (tblIncludingPattern != null) {
+ shouldSynchronize =
tblIncludingPattern.matcher(currentTable).matches();
}
- if (excludingPattern != null) {
+ if (tblExcludingPattern != null) {
shouldSynchronize =
- shouldSynchronize &&
!excludingPattern.matcher(currentTable).matches();
+ shouldSynchronize &&
!tblExcludingPattern.matcher(currentTable).matches();
}
if (!shouldSynchronize) {
LOG.debug(
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
new file mode 100644
index 0000000000..5247225caf
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.types.RowKind;
+
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Tests for {@link SyncDatabaseActionBase}. */
+public class SyncDatabaseActionBaseTest {
+ private static final String ANY_DB = "any_db";
+ private static final String WHITE_DB = "white_db";
+ private static final String BLACK_DB = "black_db";
+ private static final String WHITE_TBL = "white_tbl";
+ private static final String BLACK_TBL = "black_tbl";
+
+ private SyncDatabaseActionBase kafkaSyncDbAction;
+ private RichCdcMultiplexRecord whiteAnyDbCdcRecord;
+ private RichCdcMultiplexRecord blackAnyDbCdcRecord;
+ private RichCdcMultiplexRecord whiteCdcRecord;
+ private RichCdcMultiplexRecord blackCdcRecord;
+ private RichCdcMultiplexRecord whiteDbBlackTblCdcRecord;
+ private RichCdcMultiplexRecord blackDbWhiteTblCdcRecord;
+
+ @TempDir private java.nio.file.Path tmp;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ LocalFileIO localFileIO = new LocalFileIO();
+ Path defaultDb = new Path(tmp.toString(), "default.db");
+ localFileIO.mkdirs(defaultDb);
+
+ kafkaSyncDbAction =
+ new KafkaSyncDatabaseAction(
+ tmp.toString(), "default", new HashMap<>(), new
HashMap<>());
+
+ Map<String, String> rawData = new HashMap<>();
+ rawData.put("field", "value");
+
+ CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
+ whiteAnyDbCdcRecord =
+ new RichCdcMultiplexRecord(
+ ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+ blackAnyDbCdcRecord =
+ new RichCdcMultiplexRecord(
+ ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+ whiteCdcRecord =
+ new RichCdcMultiplexRecord(
+ WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+ blackCdcRecord =
+ new RichCdcMultiplexRecord(
+ BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+
+ whiteDbBlackTblCdcRecord =
+ new RichCdcMultiplexRecord(
+ WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+ blackDbWhiteTblCdcRecord =
+ new RichCdcMultiplexRecord(
+ BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+ }
+
+ @Test
+ public void testSyncTablesWithoutDbLists() throws NoSuchMethodException,
IOException {
+
+ kafkaSyncDbAction.includingTables(WHITE_TBL);
+ kafkaSyncDbAction.excludingTables(BLACK_TBL);
+
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
+ List<CdcRecord> parsedRecords;
+
+ parser.setRawEvent(whiteAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(1, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+
+ @Test
+ public void testSyncTablesWithDbList() {
+ kafkaSyncDbAction.includingDbs(WHITE_DB);
+ kafkaSyncDbAction.excludingDbs(BLACK_DB);
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
+ List<CdcRecord> parsedRecords;
+
+ parser.setRawEvent(whiteAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+
+ // white db and white table
+ parser.setRawEvent(whiteCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(1, parsedRecords.size());
+
+ parser.setRawEvent(blackAnyDbCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+
+ @Test
+ public void testSycTablesCrossDB() {
+ kafkaSyncDbAction.includingDbs(WHITE_DB);
+ kafkaSyncDbAction.excludingDbs(BLACK_DB);
+ kafkaSyncDbAction.excludingTables(BLACK_TBL);
+ RichCdcMultiplexRecordEventParser parser =
+ (RichCdcMultiplexRecordEventParser)
+ kafkaSyncDbAction.buildEventParserFactory().create();
+ List<CdcRecord> parsedRecords;
+ parser.setRawEvent(whiteDbBlackTblCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ parser.setRawEvent(blackDbWhiteTblCdcRecord);
+ parsedRecords = parser.parseRecords();
+ Assert.assertEquals(0, parsedRecords.size());
+ }
+}