This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 538b8faa1 [FLINK-38779] Route module supports standard RegExp 
replacement rules (#4190)
538b8faa1 is described below

commit 538b8faa1d8826f8d15a902a94a2e77b13e12093
Author: yuxiqian <[email protected]>
AuthorDate: Thu Jan 15 14:43:47 2026 +0800

    [FLINK-38779] Route module supports standard RegExp replacement rules 
(#4190)
---
 docs/content.zh/docs/core-concept/route.md         |  23 +++-
 docs/content/docs/core-concept/route.md            |  23 +++-
 .../flink/cdc/common/route}/TableIdRouter.java     | 110 +++++++++++++----
 .../mysql/factory/MySqlDataSourceFactory.java      |  57 +--------
 .../operators/schema/common/SchemaDerivator.java   |   1 +
 .../operators/schema/common/SchemaRegistry.java    |   1 +
 .../schema/distributed/SchemaOperator.java         |   2 +-
 .../schema/regular/BatchSchemaOperator.java        |   2 +-
 .../operators/schema/regular/SchemaOperator.java   |   2 +-
 .../common => common/route}/TableIdRouterTest.java | 131 ++++++++++++++++++++-
 .../schema/common/SchemaDerivatorTest.java         |   1 +
 .../operators/schema/common/SchemaTestBase.java    |  10 +-
 12 files changed, 278 insertions(+), 85 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/route.md 
b/docs/content.zh/docs/core-concept/route.md
index fa1a95353..774915d3d 100644
--- a/docs/content.zh/docs/core-concept/route.md
+++ b/docs/content.zh/docs/core-concept/route.md
@@ -85,4 +85,25 @@ route:
     description: route all tables in source_db to sink_db
 ```
 
-然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
\ No newline at end of file
+然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
+
+## 高级:基于正则捕获组的替换规则
+
+您可以在 `source-table` 字段中定义正则表达式的捕获组:
+
+```yaml
+route:
+  - source-table: db_(\.*).(\.*)_tbl
+    sink-table: sink_db_$1.sink_table_$2
+```
+
+这里我们创建了两个捕获组,分别用来匹配数据库名 `db_` 之后的后缀和表名 `_tbl` 之前的前缀。
+
+以上游表 `db_foo.bar_tbl` 为例,我们将会从中提取出 `(foo, bar)` 作为捕获组,并且将其依次绑定到 `$1` 和 `$2` 
变量中。
+因此,这张表将被路由到 `sink_db_foo.sink_table_bar` 下游表中。
+
+{{< hint info >}}
+
+注意:基于正则捕获组的替换规则无法与 `replace-symbol` 选项搭配使用。
+
+{{< /hint >}}
diff --git a/docs/content/docs/core-concept/route.md 
b/docs/content/docs/core-concept/route.md
index ca5855d58..394f5a56a 100644
--- a/docs/content/docs/core-concept/route.md
+++ b/docs/content/docs/core-concept/route.md
@@ -86,4 +86,25 @@ route:
     description: route all tables in source_db to sink_db
 ```
 
-Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` 
without hassle.
\ No newline at end of file
+Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` 
without hassle.
+
+## Advanced: RegExp Capturing & Replacement Rules
+
+It is also possible to create capturing groups in `source-table` fields like 
this:
+
+```yaml
+route:
+  - source-table: db_(\.*).(\.*)_tbl
+    sink-table: sink_db_$1.sink_table_$2
+```
+
+Here we create two capturing groups matching database suffix and table prefix.
+
+For upstream table `db_foo.bar_tbl`, capturing group `(foo, bar)` will be 
extracted and bound to `$1` and `$2`.
+As a result, such table will be routed to downstream table 
`sink_db_foo.sink_table_bar`.
+
+{{< hint info >}}
+
+Standard RegExp capturing could not be used with `replace-symbol` options.
+
+{{< /hint >}}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
similarity index 50%
rename from 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java
rename to 
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
index 6131b7f4c..06126b88a 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
@@ -15,23 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema.common;
+package org.apache.flink.cdc.common.route;
 
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Selectors;
 
 import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
 import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
 import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 import java.util.stream.Collectors;
 
@@ -39,20 +45,75 @@ import java.util.stream.Collectors;
  * Calculates how upstream data change events should be dispatched to 
downstream tables. Returns one
  * or many destination Table IDs based on provided routing rules.
  */
+@PublicEvolving
 public class TableIdRouter {
 
-    private final List<Tuple3<Selectors, String, String>> routes;
-    private final LoadingCache<TableId, List<TableId>> routingCache;
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableIdRouter.class);
     private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
 
+    private final List<Tuple3<Pattern, String, String>> routes;
+    private final LoadingCache<TableId, List<TableId>> routingCache;
+
+    private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
+
+    /**
+     * Currently, The supported regular syntax is not exactly the same in 
{@link Selectors}.
+     *
+     * <p>The main discrepancies are :
+     *
+     * <p>1) {@link Selectors} use {@code ,} to split table names instead of 
`|`.
+     *
+     * <p>2) If there is a need to use a dot ({@code .}) in a regular 
expression to match any
+     * character, it is necessary to escape the dot with a backslash.
+     *
+     * <p>3) The unescaped {@code .} is used as the separator of database and 
table name. When
+     * converting to Debezium style, it is expected to be escaped to match the 
dot ({@code .})
+     * literally instead of the meta-character.
+     */
+    public static String convertTableListToRegExpPattern(String tables) {
+        LOG.info("Rewriting CDC style table capture list: {}", tables);
+
+        // In CDC-style table matching, table names could be separated by `,` 
character.
+        // Convert it to `|` as it's standard RegEx syntax.
+        tables =
+                
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
+        LOG.info("Expression after replacing comma with vert separator: {}", 
tables);
+
+        // Essentially, we're just trying to swap escaped `\\.` and unescaped 
`.`.
+        // In our table matching syntax, `\\.` means RegEx token matcher and 
`.` means database &
+        // table name separator.
+        // On the contrary, while we're matching TableId string, `\\.` means 
matching the "dot"
+        // literal and `.` is the meta-character.
+
+        // Step 1: escape the dot with a backslash, but keep it as a 
placeholder (like `$`).
+        // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
+        String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
+        LOG.info("Expression after un-escaping dots as RegEx meta-character: 
{}", unescapedTables);
+
+        // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), 
as a separator between
+        // database and table names.
+        // For example, `db$*.tbl$*` => `db$*\.tbl$*`
+        String unescapedTablesWithDbTblSeparator = 
unescapedTables.replace(".", "\\.");
+        LOG.info("Re-escaping dots as TableId delimiter: {}", 
unescapedTablesWithDbTblSeparator);
+
+        // Step 3: restore placeholder to normal RegEx matcher (`.`)
+        // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
+        String standardRegExpTableCaptureList =
+                unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, 
".");
+        LOG.info("Final standard RegExp table capture list: {}", 
standardRegExpTableCaptureList);
+
+        return standardRegExpTableCaptureList;
+    }
+
     public TableIdRouter(List<RouteRule> routingRules) {
         this.routes = new ArrayList<>();
         for (RouteRule rule : routingRules) {
             try {
-                String tableInclusions = rule.sourceTable;
-                Selectors selectors =
-                        new 
Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
-                routes.add(new Tuple3<>(selectors, rule.sinkTable, 
rule.replaceSymbol));
+                routes.add(
+                        new Tuple3<>(
+                                
Pattern.compile(convertTableListToRegExpPattern(rule.sourceTable)),
+                                rule.sinkTable,
+                                rule.replaceSymbol));
             } catch (PatternSyntaxException e) {
                 throw new IllegalArgumentException(
                         String.format(
@@ -80,7 +141,7 @@ public class TableIdRouter {
     private List<TableId> calculateRoute(TableId sourceTableId) {
         List<TableId> routedTableIds =
                 routes.stream()
-                        .filter(route -> route.f0.isMatch(sourceTableId))
+                        .filter(route -> matches(route.f0, sourceTableId))
                         .map(route -> resolveReplacement(sourceTableId, route))
                         .collect(Collectors.toList());
         if (routedTableIds.isEmpty()) {
@@ -90,9 +151,14 @@ public class TableIdRouter {
     }
 
     private TableId resolveReplacement(
-            TableId originalTable, Tuple3<Selectors, String, String> route) {
+            TableId originalTable, Tuple3<Pattern, String, String> route) {
         if (route.f2 != null) {
             return TableId.parse(route.f1.replace(route.f2, 
originalTable.getTableName()));
+        } else {
+            Matcher matcher = route.f0.matcher(originalTable.toString());
+            if (matcher.find()) {
+                return TableId.parse(matcher.replaceAll(route.f1));
+            }
         }
         return TableId.parse(route.f1);
     }
@@ -111,18 +177,16 @@ public class TableIdRouter {
         if (routes.isEmpty()) {
             return new ArrayList<>();
         }
-        List<Set<TableId>> routedTableIds =
-                routes.stream()
-                        .map(
-                                route -> {
-                                    return tableIdSet.stream()
-                                            .filter(
-                                                    tableId -> {
-                                                        return 
route.f0.isMatch(tableId);
-                                                    })
-                                            .collect(Collectors.toSet());
-                                })
-                        .collect(Collectors.toList());
-        return routedTableIds;
+        return routes.stream()
+                .map(
+                        route ->
+                                tableIdSet.stream()
+                                        .filter(tableId -> matches(route.f0, 
tableId))
+                                        .collect(Collectors.toSet()))
+                .collect(Collectors.toList());
+    }
+
+    private static boolean matches(Pattern pattern, TableId tableId) {
+        return pattern.matcher(tableId.toString()).matches();
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index b775cebb0..1b3540da0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.source.DataSource;
 import org.apache.flink.cdc.common.utils.StringUtils;
 import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
-import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
 import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
 import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -45,7 +44,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 
 import com.mysql.cj.conf.PropertyKey;
 import io.debezium.relational.RelationalDatabaseConnectorConfig;
-import io.debezium.relational.Tables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +61,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.common.route.TableIdRouter.convertTableListToRegExpPattern;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -231,7 +230,7 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
         }
 
         if (scanBinlogNewlyAddedTableEnabled) {
-            String newTables = validateTableAndReturnDebeziumStyle(tables);
+            String newTables = convertTableListToRegExpPattern(tables);
             configFactory.tableList(newTables);
             configFactory.excludeTableList(tablesExclude);
 
@@ -516,58 +515,6 @@ public class MySqlDataSourceFactory implements 
DataSourceFactory {
 
     private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_";
 
-    /**
-     * Currently, The supported regular syntax is not exactly the same in 
{@link Selectors} and
-     * {@link Tables.TableFilter}.
-     *
-     * <p>The main distinction are :
-     *
-     * <p>1) {@link Selectors} use {@code ,} to split table names and {@link 
Tables.TableFilter} use
-     * `|` to split table names.
-     *
-     * <p>2) If there is a need to use a dot ({@code .}) in a regular 
expression to match any
-     * character, it is necessary to escape the dot with a backslash, refer to 
{@link
-     * MySqlDataSourceOptions#TABLES}.
-     *
-     * <p>3) The unescaped {@code .} is used as the separator of database and 
table name. When
-     * converting to Debezium style, it is expected to be escaped to match the 
dot ({@code .})
-     * literally instead of the meta-character.
-     */
-    private String validateTableAndReturnDebeziumStyle(String tables) {
-        LOG.info("Rewriting CDC style table capture list: {}", tables);
-
-        // In CDC-style table matching, table names could be separated by `,` 
character.
-        // Convert it to `|` as it's standard RegEx syntax.
-        tables =
-                
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
-        LOG.info("Expression after replacing comma with vert separator: {}", 
tables);
-
-        // Essentially, we're just trying to swap escaped `\\.` and unescaped 
`.`.
-        // In our table matching syntax, `\\.` means RegEx token matcher and 
`.` means database &
-        // table name separator.
-        // On the contrary, while we're matching TableId string, `\\.` means 
matching the "dot"
-        // literal and `.` is the meta-character.
-
-        // Step 1: escape the dot with a backslash, but keep it as a 
placeholder (like `$`).
-        // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
-        String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
-        LOG.info("Expression after unescaping dots as RegEx meta-character: 
{}", unescapedTables);
-
-        // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), 
as a separator between
-        // database and table names.
-        // For example, `db$*.tbl$*` => `db$*\.tbl$*`
-        String unescapedTablesWithDbTblSeparator = 
unescapedTables.replace(".", "\\.");
-        LOG.info("Re-escaping dots as TableId delimiter: {}", 
unescapedTablesWithDbTblSeparator);
-
-        // Step 3: restore placeholder to normal RegEx matcher (`.`)
-        // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
-        String debeziumStyleTableCaptureList =
-                unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, 
".");
-        LOG.info("Final Debezium-style table capture list: {}", 
debeziumStyleTableCaptureList);
-
-        return debeziumStyleTableCaptureList;
-    }
-
     /** Replaces the default timezone placeholder with session timezone, if 
applicable. */
     private static ZoneId getServerTimeZone(Configuration config) {
         final String serverTimeZone = config.get(SERVER_TIME_ZONE);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
index a2659561e..8d56f7258 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
index 8bea59548..640fb765c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import 
org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index f31cbabdc..15c0571ba 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -26,11 +26,11 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
 import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
-import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
 import 
org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
 import 
org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
index 27012edca..df990425a 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
@@ -25,11 +25,11 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
 import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
-import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index feb1a38d5..901ba168d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -26,11 +26,11 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
 import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
-import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
 import 
org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
 import 
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
similarity index 53%
rename from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java
rename to 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
index ba21b63eb..b8e487700 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema.common;
+package org.apache.flink.cdc.common.route;
 
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase;
 
 import org.junit.jupiter.api.Test;
 
@@ -26,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
@@ -38,6 +40,17 @@ public class TableIdRouterTest extends SchemaTestBase {
                 .collect(Collectors.toList());
     }
 
+    private static String testConvert(String input) {
+        return TableIdRouter.convertTableListToRegExpPattern(input);
+    }
+
+    @Test
+    void testConvertingDebeziumTableIdToStandardRegex() {
+        assertThat(testConvert("foo.bar")).isEqualTo("foo\\.bar");
+        
assertThat(testConvert("foo.bar,foo.baz")).isEqualTo("foo\\.bar|foo\\.baz");
+        assertThat(testConvert("db.\\.*")).isEqualTo("db\\..*");
+    }
+
     @Test
     void testImplicitRoute() {
         
assertThat(testRoute("db_0.table_1")).containsExactlyInAnyOrder("db_0.table_1");
@@ -129,6 +142,122 @@ public class TableIdRouterTest extends SchemaTestBase {
                                         TableId.parse("db_5.table_1"),
                                         TableId.parse("db_5.table_2"),
                                         TableId.parse("db_5.table_3"))),
+                        new HashSet<>(),
+                        new HashSet<>(),
                         new HashSet<>());
     }
+
+    @Test
+    void testRegExpCapturingGroupExpression() {
+        assertThat(Stream.of("re_1.table_1", "re_22.table_22", 
"re_333.table_333"))
+                .map(TableIdRouterTest::testRoute)
+                .map(List::toString)
+                .containsExactly(
+                        "[database.another_table_with_111_index]",
+                        "[database.another_table_with_222222_index]",
+                        "[database.another_table_with_333333333_index]");
+
+        assertThat(Stream.of("inv_1.table_foo", "inv_22.table_bar", 
"inv_333.table_baz"))
+                .map(TableIdRouterTest::testRoute)
+                .map(List::toString)
+                .containsExactly("[table_foo.inv_1]", "[table_bar.inv_22]", 
"[table_baz.inv_333]");
+    }
+
+    private static List<String> testStdRegExpRoute(
+            String sourceRouteRule, String sinkRouteRule, List<String> 
sourceTables) {
+        TableIdRouter router =
+                new TableIdRouter(List.of(new RouteRule(sourceRouteRule, 
sinkRouteRule)));
+        return sourceTables.stream()
+                .map(TableId::parse)
+                .map(router::route)
+                .map(List::toString)
+                .collect(Collectors.toList());
+    }
+
+    @Test
+    void testRegExpComplexRouting() {
+        // Capture the entire database.
+        List<String> tablesToRoute =
+                List.of("db1.tbl1", "db1.tbl2", "db1.tbl3", "db2.tbl2", 
"db2.tbl3", "db3.tbl3");
+        assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.combined", 
tablesToRoute))
+                .containsExactly(
+                        "[db1.combined]",
+                        "[db1.combined]",
+                        "[db1.combined]",
+                        "[db2.tbl2]",
+                        "[db2.tbl3]",
+                        "[db3.tbl3]");
+
+        // Capture the entire database and append prefixes.
+        assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.pre_$1", 
tablesToRoute))
+                .containsExactly(
+                        "[db1.pre_tbl1]",
+                        "[db1.pre_tbl2]",
+                        "[db1.pre_tbl3]",
+                        "[db2.tbl2]",
+                        "[db2.tbl3]",
+                        "[db3.tbl3]");
+
+        // Capture the entire database and append suffixes.
+        assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.$1_suf", 
tablesToRoute))
+                .containsExactly(
+                        "[db1.tbl1_suf]",
+                        "[db1.tbl2_suf]",
+                        "[db1.tbl3_suf]",
+                        "[db2.tbl2]",
+                        "[db2.tbl3]",
+                        "[db3.tbl3]");
+
+        // Capture the entire database and append extract parts.
+        assertThat(testStdRegExpRoute("db1.tbl(\\.*)", "db1.no$1", 
tablesToRoute))
+                .containsExactly(
+                        "[db1.no1]",
+                        "[db1.no2]",
+                        "[db1.no3]",
+                        "[db2.tbl2]",
+                        "[db2.tbl3]",
+                        "[db3.tbl3]");
+
+        // Capture databases and append database prefix.
+        assertThat(testStdRegExpRoute("(\\.*).tbl3", "pre_$1.tbl3", 
tablesToRoute))
+                .containsExactly(
+                        "[db1.tbl1]",
+                        "[db1.tbl2]",
+                        "[pre_db1.tbl3]",
+                        "[db2.tbl2]",
+                        "[pre_db2.tbl3]",
+                        "[pre_db3.tbl3]");
+
+        // Capture databases and append database suffix.
+        assertThat(testStdRegExpRoute("(\\.*).tbl3", "$1_suf.tbl3", 
tablesToRoute))
+                .containsExactly(
+                        "[db1.tbl1]",
+                        "[db1.tbl2]",
+                        "[db1_suf.tbl3]",
+                        "[db2.tbl2]",
+                        "[db2_suf.tbl3]",
+                        "[db3_suf.tbl3]");
+
+        // Capture databases and extract database parts.
+        assertThat(testStdRegExpRoute("db(\\.*).(tbl\\.*)", "no$1.$2", 
tablesToRoute))
+                .containsExactly(
+                        "[no1.tbl1]",
+                        "[no1.tbl2]",
+                        "[no1.tbl3]",
+                        "[no2.tbl2]",
+                        "[no2.tbl3]",
+                        "[no3.tbl3]");
+
+        // Capture multiple parts and append extra tags.
+        assertThat(
+                        testStdRegExpRoute(
+                                "db(\\.*).tbl(\\.*)", 
"Database$1.Collection$2", tablesToRoute))
+                .containsExactly(
+                        "[Database1.Collection1]",
+                        "[Database1.Collection2]",
+                        "[Database1.Collection3]",
+                        "[Database2.Collection2]",
+                        "[Database2.Collection3]",
+                        "[Database3.Collection3]");
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
index 99d6929d1..40b40b468 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
index 8086f650e..af273d13d 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
@@ -62,7 +63,14 @@ public abstract class SchemaTestBase {
                     new RouteRule("db_5.table_\\.*", "db_5.prefix_<>_suffix", 
"<>"),
 
                     // Irrelevant routes
-                    new RouteRule("foo", "bar", null));
+                    new RouteRule("foo", "bar", null),
+
+                    // Standard RegExp capturing rules
+                    new RouteRule(
+                            "re_\\d+.table_(\\.*)",
+                            "database.another_table_with_$1$1$1_index",
+                            null),
+                    new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null));
 
     protected static final TableIdRouter TABLE_ID_ROUTER = new 
TableIdRouter(ROUTING_RULES);
 

Reply via email to