This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 538b8faa1 [FLINK-38779] Route module supports standard RegExp
replacement rules (#4190)
538b8faa1 is described below
commit 538b8faa1d8826f8d15a902a94a2e77b13e12093
Author: yuxiqian <[email protected]>
AuthorDate: Thu Jan 15 14:43:47 2026 +0800
[FLINK-38779] Route module supports standard RegExp replacement rules
(#4190)
---
docs/content.zh/docs/core-concept/route.md | 23 +++-
docs/content/docs/core-concept/route.md | 23 +++-
.../flink/cdc/common/route}/TableIdRouter.java | 110 +++++++++++++----
.../mysql/factory/MySqlDataSourceFactory.java | 57 +--------
.../operators/schema/common/SchemaDerivator.java | 1 +
.../operators/schema/common/SchemaRegistry.java | 1 +
.../schema/distributed/SchemaOperator.java | 2 +-
.../schema/regular/BatchSchemaOperator.java | 2 +-
.../operators/schema/regular/SchemaOperator.java | 2 +-
.../common => common/route}/TableIdRouterTest.java | 131 ++++++++++++++++++++-
.../schema/common/SchemaDerivatorTest.java | 1 +
.../operators/schema/common/SchemaTestBase.java | 10 +-
12 files changed, 278 insertions(+), 85 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/route.md
b/docs/content.zh/docs/core-concept/route.md
index fa1a95353..774915d3d 100644
--- a/docs/content.zh/docs/core-concept/route.md
+++ b/docs/content.zh/docs/core-concept/route.md
@@ -85,4 +85,25 @@ route:
description: route all tables in source_db to sink_db
```
-然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
\ No newline at end of file
+然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
+
+## 高级:基于正则捕获组的替换规则
+
+您可以在 `source-table` 字段中定义正则表达式的捕获组:
+
+```yaml
+route:
+ - source-table: db_(\.*).(\.*)_tbl
+ sink-table: sink_db_$1.sink_table_$2
+```
+
+这里我们创建了两个捕获组,分别用来匹配数据库名 `db_` 之后的后缀和表名 `_tbl` 之前的前缀。
+
+以上游表 `db_foo.bar_tbl` 为例,我们将会从中提取出 `(foo, bar)` 作为捕获组,并且将其依次绑定到 `$1` 和 `$2`
变量中。
+因此,这张表将被路由到 `sink_db_foo.sink_table_bar` 下游表中。
+
+{{< hint info >}}
+
+注意:基于正则捕获组的替换规则无法与 `replace-symbol` 选项搭配使用。
+
+{{< /hint >}}
diff --git a/docs/content/docs/core-concept/route.md
b/docs/content/docs/core-concept/route.md
index ca5855d58..394f5a56a 100644
--- a/docs/content/docs/core-concept/route.md
+++ b/docs/content/docs/core-concept/route.md
@@ -86,4 +86,25 @@ route:
description: route all tables in source_db to sink_db
```
-Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX`
without hassle.
\ No newline at end of file
+Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX`
without hassle.
+
+## Advanced: RegExp Capturing & Replacement Rules
+
+It is also possible to create capturing groups in `source-table` fields like
this:
+
+```yaml
+route:
+ - source-table: db_(\.*).(\.*)_tbl
+ sink-table: sink_db_$1.sink_table_$2
+```
+
+Here we create two capturing groups matching database suffix and table prefix.
+
+For upstream table `db_foo.bar_tbl`, capturing group `(foo, bar)` will be
extracted and bound to `$1` and `$2`.
+As a result, such table will be routed to downstream table
`sink_db_foo.sink_table_bar`.
+
+{{< hint info >}}
+
+Standard RegExp capturing could not be used with `replace-symbol` options.
+
+{{< /hint >}}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
similarity index 50%
rename from
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java
rename to
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
index 6131b7f4c..06126b88a 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
@@ -15,23 +15,29 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.runtime.operators.schema.common;
+package org.apache.flink.cdc.common.route;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
@@ -39,20 +45,75 @@ import java.util.stream.Collectors;
* Calculates how upstream data change events should be dispatched to
downstream tables. Returns one
* or many destination Table IDs based on provided routing rules.
*/
+@PublicEvolving
public class TableIdRouter {
- private final List<Tuple3<Selectors, String, String>> routes;
- private final LoadingCache<TableId, List<TableId>> routingCache;
+ private static final Logger LOG =
LoggerFactory.getLogger(TableIdRouter.class);
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
+ private final List<Tuple3<Pattern, String, String>> routes;
+ private final LoadingCache<TableId, List<TableId>> routingCache;
+
+ private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
+
+ /**
+ * Currently, The supported regular syntax is not exactly the same in
{@link Selectors}.
+ *
+ * <p>The main discrepancies are :
+ *
+ * <p>1) {@link Selectors} use {@code ,} to split table names instead of
`|`.
+ *
+ * <p>2) If there is a need to use a dot ({@code .}) in a regular
expression to match any
+ * character, it is necessary to escape the dot with a backslash.
+ *
+ * <p>3) The unescaped {@code .} is used as the separator of database and
table name. When
+ * converting to Debezium style, it is expected to be escaped to match the
dot ({@code .})
+ * literally instead of the meta-character.
+ */
+ public static String convertTableListToRegExpPattern(String tables) {
+ LOG.info("Rewriting CDC style table capture list: {}", tables);
+
+ // In CDC-style table matching, table names could be separated by `,`
character.
+ // Convert it to `|` as it's standard RegEx syntax.
+ tables =
+
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
+ LOG.info("Expression after replacing comma with vert separator: {}",
tables);
+
+ // Essentially, we're just trying to swap escaped `\\.` and unescaped
`.`.
+ // In our table matching syntax, `\\.` means RegEx token matcher and
`.` means database &
+ // table name separator.
+ // On the contrary, while we're matching TableId string, `\\.` means
matching the "dot"
+ // literal and `.` is the meta-character.
+
+ // Step 1: escape the dot with a backslash, but keep it as a
placeholder (like `$`).
+ // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
+ String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
+ LOG.info("Expression after un-escaping dots as RegEx meta-character:
{}", unescapedTables);
+
+ // Step 2: replace all remaining dots (`.`) to quoted version (`\.`),
as a separator between
+ // database and table names.
+ // For example, `db$*.tbl$*` => `db$*\.tbl$*`
+ String unescapedTablesWithDbTblSeparator =
unescapedTables.replace(".", "\\.");
+ LOG.info("Re-escaping dots as TableId delimiter: {}",
unescapedTablesWithDbTblSeparator);
+
+ // Step 3: restore placeholder to normal RegEx matcher (`.`)
+ // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
+ String standardRegExpTableCaptureList =
+ unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER,
".");
+ LOG.info("Final standard RegExp table capture list: {}",
standardRegExpTableCaptureList);
+
+ return standardRegExpTableCaptureList;
+ }
+
public TableIdRouter(List<RouteRule> routingRules) {
this.routes = new ArrayList<>();
for (RouteRule rule : routingRules) {
try {
- String tableInclusions = rule.sourceTable;
- Selectors selectors =
- new
Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
- routes.add(new Tuple3<>(selectors, rule.sinkTable,
rule.replaceSymbol));
+ routes.add(
+ new Tuple3<>(
+
Pattern.compile(convertTableListToRegExpPattern(rule.sourceTable)),
+ rule.sinkTable,
+ rule.replaceSymbol));
} catch (PatternSyntaxException e) {
throw new IllegalArgumentException(
String.format(
@@ -80,7 +141,7 @@ public class TableIdRouter {
private List<TableId> calculateRoute(TableId sourceTableId) {
List<TableId> routedTableIds =
routes.stream()
- .filter(route -> route.f0.isMatch(sourceTableId))
+ .filter(route -> matches(route.f0, sourceTableId))
.map(route -> resolveReplacement(sourceTableId, route))
.collect(Collectors.toList());
if (routedTableIds.isEmpty()) {
@@ -90,9 +151,14 @@ public class TableIdRouter {
}
private TableId resolveReplacement(
- TableId originalTable, Tuple3<Selectors, String, String> route) {
+ TableId originalTable, Tuple3<Pattern, String, String> route) {
if (route.f2 != null) {
return TableId.parse(route.f1.replace(route.f2,
originalTable.getTableName()));
+ } else {
+ Matcher matcher = route.f0.matcher(originalTable.toString());
+ if (matcher.find()) {
+ return TableId.parse(matcher.replaceAll(route.f1));
+ }
}
return TableId.parse(route.f1);
}
@@ -111,18 +177,16 @@ public class TableIdRouter {
if (routes.isEmpty()) {
return new ArrayList<>();
}
- List<Set<TableId>> routedTableIds =
- routes.stream()
- .map(
- route -> {
- return tableIdSet.stream()
- .filter(
- tableId -> {
- return
route.f0.isMatch(tableId);
- })
- .collect(Collectors.toSet());
- })
- .collect(Collectors.toList());
- return routedTableIds;
+ return routes.stream()
+ .map(
+ route ->
+ tableIdSet.stream()
+ .filter(tableId -> matches(route.f0,
tableId))
+ .collect(Collectors.toSet()))
+ .collect(Collectors.toList());
+ }
+
+ private static boolean matches(Pattern pattern, TableId tableId) {
+ return pattern.matcher(tableId.toString()).matches();
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
index b775cebb0..1b3540da0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java
@@ -30,7 +30,6 @@ import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
-import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -45,7 +44,6 @@ import org.apache.flink.table.catalog.ObjectPath;
import com.mysql.cj.conf.PropertyKey;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
-import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +61,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.flink.cdc.common.route.TableIdRouter.convertTableListToRegExpPattern;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -231,7 +230,7 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
}
if (scanBinlogNewlyAddedTableEnabled) {
- String newTables = validateTableAndReturnDebeziumStyle(tables);
+ String newTables = convertTableListToRegExpPattern(tables);
configFactory.tableList(newTables);
configFactory.excludeTableList(tablesExclude);
@@ -516,58 +515,6 @@ public class MySqlDataSourceFactory implements
DataSourceFactory {
private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_";
- /**
- * Currently, The supported regular syntax is not exactly the same in
{@link Selectors} and
- * {@link Tables.TableFilter}.
- *
- * <p>The main distinction are :
- *
- * <p>1) {@link Selectors} use {@code ,} to split table names and {@link
Tables.TableFilter} use
- * `|` to split table names.
- *
- * <p>2) If there is a need to use a dot ({@code .}) in a regular
expression to match any
- * character, it is necessary to escape the dot with a backslash, refer to
{@link
- * MySqlDataSourceOptions#TABLES}.
- *
- * <p>3) The unescaped {@code .} is used as the separator of database and
table name. When
- * converting to Debezium style, it is expected to be escaped to match the
dot ({@code .})
- * literally instead of the meta-character.
- */
- private String validateTableAndReturnDebeziumStyle(String tables) {
- LOG.info("Rewriting CDC style table capture list: {}", tables);
-
- // In CDC-style table matching, table names could be separated by `,`
character.
- // Convert it to `|` as it's standard RegEx syntax.
- tables =
-
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
- LOG.info("Expression after replacing comma with vert separator: {}",
tables);
-
- // Essentially, we're just trying to swap escaped `\\.` and unescaped
`.`.
- // In our table matching syntax, `\\.` means RegEx token matcher and
`.` means database &
- // table name separator.
- // On the contrary, while we're matching TableId string, `\\.` means
matching the "dot"
- // literal and `.` is the meta-character.
-
- // Step 1: escape the dot with a backslash, but keep it as a
placeholder (like `$`).
- // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
- String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
- LOG.info("Expression after unescaping dots as RegEx meta-character:
{}", unescapedTables);
-
- // Step 2: replace all remaining dots (`.`) to quoted version (`\.`),
as a separator between
- // database and table names.
- // For example, `db$*.tbl$*` => `db$*\.tbl$*`
- String unescapedTablesWithDbTblSeparator =
unescapedTables.replace(".", "\\.");
- LOG.info("Re-escaping dots as TableId delimiter: {}",
unescapedTablesWithDbTblSeparator);
-
- // Step 3: restore placeholder to normal RegEx matcher (`.`)
- // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
- String debeziumStyleTableCaptureList =
- unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER,
".");
- LOG.info("Final Debezium-style table capture list: {}",
debeziumStyleTableCaptureList);
-
- return debeziumStyleTableCaptureList;
- }
-
/** Replaces the default timezone placeholder with session timezone, if
applicable. */
private static ZoneId getServerTimeZone(Configuration config) {
final String serverTimeZone = config.get(SERVER_TIME_ZONE);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
index a2659561e..8d56f7258 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
index 8bea59548..640fb765c 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
@@ -22,6 +22,7 @@ import
org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import
org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index f31cbabdc..15c0571ba 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -26,11 +26,11 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
-import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import
org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import
org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
index 27012edca..df990425a 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
@@ -25,11 +25,11 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
-import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index feb1a38d5..901ba168d 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -26,11 +26,11 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
-import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import
org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
import
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
similarity index 53%
rename from
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java
rename to
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
index ba21b63eb..b8e487700 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.flink.cdc.runtime.operators.schema.common;
+package org.apache.flink.cdc.common.route;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase;
import org.junit.jupiter.api.Test;
@@ -26,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
@@ -38,6 +40,17 @@ public class TableIdRouterTest extends SchemaTestBase {
.collect(Collectors.toList());
}
+ private static String testConvert(String input) {
+ return TableIdRouter.convertTableListToRegExpPattern(input);
+ }
+
+ @Test
+ void testConvertingDebeziumTableIdToStandardRegex() {
+ assertThat(testConvert("foo.bar")).isEqualTo("foo\\.bar");
+
assertThat(testConvert("foo.bar,foo.baz")).isEqualTo("foo\\.bar|foo\\.baz");
+ assertThat(testConvert("db.\\.*")).isEqualTo("db\\..*");
+ }
+
@Test
void testImplicitRoute() {
assertThat(testRoute("db_0.table_1")).containsExactlyInAnyOrder("db_0.table_1");
@@ -129,6 +142,122 @@ public class TableIdRouterTest extends SchemaTestBase {
TableId.parse("db_5.table_1"),
TableId.parse("db_5.table_2"),
TableId.parse("db_5.table_3"))),
+ new HashSet<>(),
+ new HashSet<>(),
new HashSet<>());
}
+
+ @Test
+ void testRegExpCapturingGroupExpression() {
+ assertThat(Stream.of("re_1.table_1", "re_22.table_22",
"re_333.table_333"))
+ .map(TableIdRouterTest::testRoute)
+ .map(List::toString)
+ .containsExactly(
+ "[database.another_table_with_111_index]",
+ "[database.another_table_with_222222_index]",
+ "[database.another_table_with_333333333_index]");
+
+ assertThat(Stream.of("inv_1.table_foo", "inv_22.table_bar",
"inv_333.table_baz"))
+ .map(TableIdRouterTest::testRoute)
+ .map(List::toString)
+ .containsExactly("[table_foo.inv_1]", "[table_bar.inv_22]",
"[table_baz.inv_333]");
+ }
+
+ private static List<String> testStdRegExpRoute(
+ String sourceRouteRule, String sinkRouteRule, List<String>
sourceTables) {
+ TableIdRouter router =
+ new TableIdRouter(List.of(new RouteRule(sourceRouteRule,
sinkRouteRule)));
+ return sourceTables.stream()
+ .map(TableId::parse)
+ .map(router::route)
+ .map(List::toString)
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ void testRegExpComplexRouting() {
+ // Capture the entire database.
+ List<String> tablesToRoute =
+ List.of("db1.tbl1", "db1.tbl2", "db1.tbl3", "db2.tbl2",
"db2.tbl3", "db3.tbl3");
+ assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.combined",
tablesToRoute))
+ .containsExactly(
+ "[db1.combined]",
+ "[db1.combined]",
+ "[db1.combined]",
+ "[db2.tbl2]",
+ "[db2.tbl3]",
+ "[db3.tbl3]");
+
+ // Capture the entire database and append prefixes.
+ assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.pre_$1",
tablesToRoute))
+ .containsExactly(
+ "[db1.pre_tbl1]",
+ "[db1.pre_tbl2]",
+ "[db1.pre_tbl3]",
+ "[db2.tbl2]",
+ "[db2.tbl3]",
+ "[db3.tbl3]");
+
+ // Capture the entire database and append suffixes.
+ assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.$1_suf",
tablesToRoute))
+ .containsExactly(
+ "[db1.tbl1_suf]",
+ "[db1.tbl2_suf]",
+ "[db1.tbl3_suf]",
+ "[db2.tbl2]",
+ "[db2.tbl3]",
+ "[db3.tbl3]");
+
+ // Capture the entire database and append extract parts.
+ assertThat(testStdRegExpRoute("db1.tbl(\\.*)", "db1.no$1",
tablesToRoute))
+ .containsExactly(
+ "[db1.no1]",
+ "[db1.no2]",
+ "[db1.no3]",
+ "[db2.tbl2]",
+ "[db2.tbl3]",
+ "[db3.tbl3]");
+
+ // Capture databases and append database prefix.
+ assertThat(testStdRegExpRoute("(\\.*).tbl3", "pre_$1.tbl3",
tablesToRoute))
+ .containsExactly(
+ "[db1.tbl1]",
+ "[db1.tbl2]",
+ "[pre_db1.tbl3]",
+ "[db2.tbl2]",
+ "[pre_db2.tbl3]",
+ "[pre_db3.tbl3]");
+
+ // Capture databases and append database suffix.
+ assertThat(testStdRegExpRoute("(\\.*).tbl3", "$1_suf.tbl3",
tablesToRoute))
+ .containsExactly(
+ "[db1.tbl1]",
+ "[db1.tbl2]",
+ "[db1_suf.tbl3]",
+ "[db2.tbl2]",
+ "[db2_suf.tbl3]",
+ "[db3_suf.tbl3]");
+
+ // Capture databases and extract database parts.
+ assertThat(testStdRegExpRoute("db(\\.*).(tbl\\.*)", "no$1.$2",
tablesToRoute))
+ .containsExactly(
+ "[no1.tbl1]",
+ "[no1.tbl2]",
+ "[no1.tbl3]",
+ "[no2.tbl2]",
+ "[no2.tbl3]",
+ "[no3.tbl3]");
+
+ // Capture multiple parts and append extra tags.
+ assertThat(
+ testStdRegExpRoute(
+ "db(\\.*).tbl(\\.*)",
"Database$1.Collection$2", tablesToRoute))
+ .containsExactly(
+ "[Database1.Collection1]",
+ "[Database1.Collection2]",
+ "[Database1.Collection3]",
+ "[Database2.Collection2]",
+ "[Database2.Collection3]",
+ "[Database3.Collection3]");
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
index 99d6929d1..40b40b468 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
index 8086f650e..af273d13d 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
@@ -62,7 +63,14 @@ public abstract class SchemaTestBase {
new RouteRule("db_5.table_\\.*", "db_5.prefix_<>_suffix",
"<>"),
// Irrelevant routes
- new RouteRule("foo", "bar", null));
+ new RouteRule("foo", "bar", null),
+
+ // Standard RegExp capturing rules
+ new RouteRule(
+ "re_\\d+.table_(\\.*)",
+ "database.another_table_with_$1$1$1_index",
+ null),
+ new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null));
protected static final TableIdRouter TABLE_ID_ROUTER = new
TableIdRouter(ROUTING_RULES);