This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d797a602 [FLINK-34903][cdc][mysql] Add mysql-pipeline-connector with 
tables.exclude option to exclude unnecessary tables (#3186)
0d797a602 is described below

commit 0d797a6021ea2f3973f8dc74c0c741b695e85a12
Author: Thorne <46524102+shiy...@users.noreply.github.com>
AuthorDate: Fri Apr 19 15:49:32 2024 +0800

    [FLINK-34903][cdc][mysql] Add mysql-pipeline-connector with tables.exclude 
option to exclude unnecessary tables (#3186)
---
 docs/content.zh/docs/connectors/mysql.md           |  8 ++++
 docs/content/docs/connectors/mysql.md              |  8 ++++
 .../mysql/factory/MySqlDataSourceFactory.java      | 27 ++++++++++---
 .../mysql/source/MySqlDataSourceOptions.java       | 12 ++++++
 .../mysql/source/MySqlDataSourceFactoryTest.java   | 45 ++++++++++++++++++++++
 5 files changed, 95 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/connectors/mysql.md 
b/docs/content.zh/docs/connectors/mysql.md
index 564dd3dbf..3410cdea0 100644
--- a/docs/content.zh/docs/connectors/mysql.md
+++ b/docs/content.zh/docs/connectors/mysql.md
@@ -107,6 +107,14 @@ pipeline:
           需要注意的是,点号(.)被视为数据库和表名的分隔符。 
如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。<br>
           例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
     </tr>
+    <tr>
+      <td>tables.exclude</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>需要排除的 MySQL 数据库的表名,参数会在tables参数后发生排除作用。表名支持正则表达式,以排除满足正则表达式的多个表。<br>
+          用法和tables参数相同</td>
+    </tr>
     <tr>
       <td>schema-change.enabled</td>
       <td>optional</td>
diff --git a/docs/content/docs/connectors/mysql.md 
b/docs/content/docs/connectors/mysql.md
index c6ef38252..879920fec 100644
--- a/docs/content/docs/connectors/mysql.md
+++ b/docs/content/docs/connectors/mysql.md
@@ -109,6 +109,14 @@ pipeline:
           If there is a need to use a dot (.) in a regular expression to match 
any character, it is necessary to escape the dot with a backslash.<br>
           eg. db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*</td>
     </tr>
+    <tr>
+      <td>tables.exclude</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Table name of the MySQL database to exclude, parameter will have an 
exclusion effect after the tables parameter. The table-name also supports 
regular expressions to exclude multiple tables that satisfy the regular 
expressions. <br>
+          The usage is the same as the tables parameter</td>
+    </tr>
     <tr>
       <td>schema-change.enabled</td>
       <td>optional</td>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index 744041d25..dc9972fe0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -42,9 +42,11 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -70,6 +72,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare;
 import static 
org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -93,6 +96,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         String username = config.get(USERNAME);
         String password = config.get(PASSWORD);
         String tables = config.get(TABLES);
+        String tablesExclude = config.get(TABLES_EXCLUDE);
 
         String serverId = validateAndGetServerId(config);
         ZoneId serverTimeZone = getServerTimeZone(config);
@@ -151,12 +155,25 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
                         .jdbcProperties(getJdbcProperties(configMap));
 
         Selectors selectors = new 
Selectors.SelectorsBuilder().includeTables(tables).build();
-        String[] capturedTables = getTableList(configFactory.createConfig(0), 
selectors);
-        if (capturedTables.length == 0) {
+        List<String> capturedTables = 
getTableList(configFactory.createConfig(0), selectors);
+        if (capturedTables.isEmpty()) {
             throw new IllegalArgumentException(
                     "Cannot find any table by the option 'tables' = " + 
tables);
         }
-        configFactory.tableList(capturedTables);
+        if (tablesExclude != null) {
+            Selectors selectExclude =
+                    new 
Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
+            List<String> excludeTables = 
getTableList(configFactory.createConfig(0), selectExclude);
+            if (!excludeTables.isEmpty()) {
+                capturedTables.removeAll(excludeTables);
+            }
+            if (capturedTables.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Cannot find any table with by the option 
'tables.exclude'  = "
+                                + tablesExclude);
+            }
+        }
+        configFactory.tableList(capturedTables.toArray(new String[0]));
 
         return new MySqlDataSource(configFactory);
     }
@@ -211,11 +228,11 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
     private static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = 
"specific-offset";
     private static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = 
"timestamp";
 
-    private static String[] getTableList(MySqlSourceConfig sourceConfig, 
Selectors selectors) {
+    private static List<String> getTableList(MySqlSourceConfig sourceConfig, 
Selectors selectors) {
         return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
                 .filter(selectors::isMatch)
                 .map(TableId::toString)
-                .toArray(String[]::new);
+                .collect(Collectors.toList());
     }
 
     private static StartupOptions getStartupOptions(Configuration config) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
index 7c7dcadb8..e852eb3d7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java
@@ -231,4 +231,16 @@ public class MySqlDataSourceOptions {
                     .defaultValue(true)
                     .withDescription(
                             "Whether send schema change events, by default is 
true. If set to false, the schema changes will not be sent.");
+
+    @Experimental
+    public static final ConfigOption<String> TABLES_EXCLUDE =
+            ConfigOptions.key("tables.exclude")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the MySQL tables to Exclude. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.\\.*, db1.user_table_[0-9]+, 
db[1-2].[app|web]_order_\\.*");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index 025415abb..59dcdb6cf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -32,6 +32,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
@@ -79,6 +80,50 @@ public class MySqlDataSourceFactoryTest extends 
MySqlSourceTestBase {
                 .hasMessageContaining("Cannot find any table by the option 
'tables' = " + tables);
     }
 
+    @Test
+    public void testExcludeTable() {
+        inventoryDatabase.createAndInitialize();
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".\\.*");
+        String tableExclude = inventoryDatabase.getDatabaseName() + ".orders";
+        options.put(TABLES_EXCLUDE.key(), tableExclude);
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+        assertThat(dataSource.getSourceConfig().getTableList())
+                
.isNotEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".orders"))
+                .isEqualTo(
+                        Arrays.asList(
+                                inventoryDatabase.getDatabaseName() + 
".customers",
+                                inventoryDatabase.getDatabaseName() + 
".products"));
+    }
+
+    @Test
+    public void testExcludeAllTable() {
+        inventoryDatabase.createAndInitialize();
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".prod\\.*");
+        String tableExclude = inventoryDatabase.getDatabaseName() + 
".prod\\.*";
+        options.put(TABLES_EXCLUDE.key(), tableExclude);
+        Factory.Context context = new 
MockContext(Configuration.fromMap(options));
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        assertThatThrownBy(() -> factory.createDataSource(context))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Cannot find any table with by the option 
'tables.exclude'  = "
+                                + tableExclude);
+    }
+
     class MockContext implements Factory.Context {
 
         Configuration factoryConfiguration;

Reply via email to