This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c9ffab779 [FLINK-38831][runtime] Route configuration support
FIRST_MATCH mode (#4212)
c9ffab779 is described below
commit c9ffab7792a443478cca3b4fff985354b2520ca4
Author: SkylerLin <[email protected]>
AuthorDate: Tue Mar 17 19:32:13 2026 +0800
[FLINK-38831][runtime] Route configuration support FIRST_MATCH mode (#4212)
---
README.md | 1 +
docs/content.zh/docs/core-concept/data-pipeline.md | 1 +
docs/content.zh/docs/core-concept/route.md | 23 ++
docs/content/docs/core-concept/data-pipeline.md | 1 +
docs/content/docs/core-concept/route.md | 25 ++-
.../parser/YamlPipelineDefinitionParserTest.java | 65 ++++++
.../pipeline-definition-with-route-mode.yaml | 48 +++++
.../flink/cdc/common/pipeline/PipelineOptions.java | 16 ++
.../flink/cdc/common/pipeline/RouteMode.java | 29 +++
.../flink/cdc/common/route/TableIdRouter.java | 65 ++++--
.../flink/cdc/composer/definition/PipelineDef.java | 8 +
.../cdc/composer/flink/FlinkPipelineComposer.java | 6 +-
.../flink/translator/SchemaOperatorTranslator.java | 40 +++-
.../flink/FlinkPipelineComposerITCase.java | 141 +++++++++++++
.../flink/cdc/pipeline/tests/RouteE2eITCase.java | 141 +++++++++++++
.../operators/schema/common/SchemaRegistry.java | 6 +-
.../schema/distributed/SchemaCoordinator.java | 3 +
.../distributed/SchemaCoordinatorProvider.java | 5 +
.../schema/distributed/SchemaOperator.java | 6 +-
.../schema/distributed/SchemaOperatorFactory.java | 9 +-
.../schema/regular/BatchSchemaOperator.java | 10 +-
.../schema/regular/SchemaCoordinator.java | 3 +
.../schema/regular/SchemaCoordinatorProvider.java | 5 +
.../operators/schema/regular/SchemaOperator.java | 15 +-
.../schema/regular/SchemaOperatorFactory.java | 9 +-
.../flink/cdc/common/route/TableIdRouterTest.java | 5 +-
.../schema/common/SchemaDerivatorTest.java | 4 +-
.../operators/schema/common/SchemaTestBase.java | 4 +-
.../schema/common/TableIdRouterMatchModeTest.java | 231 +++++++++++++++++++++
.../schema/distributed/SchemaEvolveTest.java | 4 +
.../operators/schema/regular/SchemaEvolveTest.java | 28 ++-
.../schema/regular/SchemaOperatorTest.java | 5 +-
.../DistributedEventOperatorTestHarness.java | 2 +
.../operators/RegularEventOperatorTestHarness.java | 2 +
34 files changed, 921 insertions(+), 45 deletions(-)
diff --git a/README.md b/README.md
index ab17cfa66..8b818a923 100644
--- a/README.md
+++ b/README.md
@@ -100,6 +100,7 @@ SELECT * FROM test_table;
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
+ route-mode: ALL_MATCH
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md
b/docs/content.zh/docs/core-concept/data-pipeline.md
index a2edc0854..6bde6e462 100644
--- a/docs/content.zh/docs/core-concept/data-pipeline.md
+++ b/docs/content.zh/docs/core-concept/data-pipeline.md
@@ -118,6 +118,7 @@ under the License.
| `parallelism` | pipeline的全局并发度,默认值是1。
[...]
| `local-time-zone` | 作业级别的本地时区。
[...]
| `execution.runtime-mode` | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是
STREAMING。
[...]
+| `route-mode` | [路由]({{< ref "docs/core-concept/route"
>}}#路由模式)规则的匹配模式。可选值:`ALL_MATCH`(默认,应用所有匹配的规则)或 `FIRST_MATCH`(只应用第一个匹配的规则)。
| optional
|
| `schema.change.behavior` | 如何处理 [schema 变更]({{< ref
"docs/core-concept/schema-evolution" >}})。可选值:[`exception`]({{< ref
"docs/core-concept/schema-evolution" >}}#exception-mode)、[`evolve`]({{< ref
"docs/core-concept/schema-evolution" >}}#evolve-mode)、[`try_evolve`]({{< ref
"docs/core-concept/schema-evolution" >}}#tryevolve-mode)、[`lenient`]({{< ref
"docs/core-concept/schema-evolution" >}}#lenient-mode)(默认值)或 [`ignore`]({{< ref
"docs/core-concept/schema-evolution" >}}#ignore-mode)。 [...]
| `schema.operator.uid` | Schema 算子的唯一 ID。此 ID
用于算子间通信,必须在所有算子中保持唯一。**已废弃**:请使用 `operator.uid.prefix` 代替。
[...]
| `schema-operator.rpc-timeout` | SchemaOperator 等待下游 SchemaChangeEvent
应用完成的超时时间,默认值是 3 分钟。
[...]
diff --git a/docs/content.zh/docs/core-concept/route.md
b/docs/content.zh/docs/core-concept/route.md
index 774915d3d..88428f853 100644
--- a/docs/content.zh/docs/core-concept/route.md
+++ b/docs/content.zh/docs/core-concept/route.md
@@ -39,6 +39,29 @@ under the License.
一个 Route 模块可以包含一个或多个 source-table/sink-table 规则。
+# 路由模式
+默认情况下,所有匹配的路由规则都会被应用到表上。你可以在 pipeline 配置中通过 `route-mode` 选项来改变这一行为:
+
+| 值 | 描述 |
+|--------------|-------------------------------------------------|
+| `ALL_MATCH` | 应用所有匹配的路由规则到表上。这是默认模式。 |
+| `FIRST_MATCH`| 只应用第一个匹配的路由规则,并停止后续规则的计算。 |
+
+例如,使用 `FIRST_MATCH` 模式:
+
+```yaml
+pipeline:
+ name: Sync MySQL Database to Doris
+ parallelism: 2
+ route-mode: FIRST_MATCH
+```
+
+{{< hint info >}}
+
+当使用 `FIRST_MATCH` 模式时,路由规则会按照定义的顺序进行计算。第一个匹配源表的规则会被应用,后续的规则将被跳过。
+
+{{< /hint >}}
+
# 示例
## 路由一个 Data Source 表到一个 Data Sink 表
如果同步一个 `mydb` 数据库中的 `web_order` 表到一个相同库的 `ods_web_order` 表,我们可以使用下面的 yaml
文件来定义这个路由:
diff --git a/docs/content/docs/core-concept/data-pipeline.md
b/docs/content/docs/core-concept/data-pipeline.md
index e8ab92624..2a9a27b50 100644
--- a/docs/content/docs/core-concept/data-pipeline.md
+++ b/docs/content/docs/core-concept/data-pipeline.md
@@ -120,6 +120,7 @@ Note that whilst the parameters are each individually
optional, at least one of
| `parallelism` | The global parallelism of the pipeline.
Defaults to 1.
[...]
| `local-time-zone` | The local time zone defines current session
time zone id.
[...]
| `execution.runtime-mode` | The runtime mode of the pipeline includes
STREAMING and BATCH, with the default value being STREAMING.
[...]
+| `route-mode` | The matching mode for [route]({{< ref
"docs/core-concept/route" >}}#route-mode) rules. One of: `ALL_MATCH` (default,
apply all matching rules) or `FIRST_MATCH` (apply only the first matching
rule).
[...]
| `schema.change.behavior` | How to handle [changes in schema]({{< ref
"docs/core-concept/schema-evolution" >}}). One of: [`exception`]({{< ref
"docs/core-concept/schema-evolution" >}}#exception-mode), [`evolve`]({{< ref
"docs/core-concept/schema-evolution" >}}#evolve-mode), [`try_evolve`]({{< ref
"docs/core-concept/schema-evolution" >}}#tryevolve-mode), [`lenient`]({{< ref
"docs/core-concept/schema-evolution" >}}#lenient-mode) (default) or
[`ignore`]({{< ref "docs/core-concept/sche [...]
| `schema.operator.uid` | The unique ID for schema operator. This ID
will be used for inter-operator communications and must be unique across
operators. **Deprecated**: use `operator.uid.prefix` instead.
[...]
| `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait
downstream SchemaChangeEvent applying finished, the default value is 3 minutes.
[...]
diff --git a/docs/content/docs/core-concept/route.md
b/docs/content/docs/core-concept/route.md
index 394f5a56a..2c1de4435 100644
--- a/docs/content/docs/core-concept/route.md
+++ b/docs/content/docs/core-concept/route.md
@@ -28,7 +28,7 @@ under the License.
**Route** specifies the rule of matching a list of source-table and mapping to
sink-table. The most typical scenario is the merge of sub-databases and
sub-tables, routing multiple upstream source tables to the same sink table.
# Parameters
-To describe a route, the follows are required:
+To describe a route, the follows are required:
| parameter | meaning
| optional/required |
|----------------|---------------------------------------------------------------------------------------------|-------------------|
@@ -39,6 +39,29 @@ To describe a route, the follows are required:
A route module can contain a list of source-table/sink-table rules.
+# Route Mode
+By default, all matching route rules are applied to a table. You can configure
the `route-mode` option in the pipeline configuration to change this behavior:
+
+| Value | Description
|
+|--------------|------------------------------------------------------------------------|
+| `ALL_MATCH` | Apply all matching route rules to a table. This is the
default mode. |
+| `FIRST_MATCH`| Apply only the first matching route rule and stop evaluation.
|
+
+For example, to use `FIRST_MATCH` mode:
+
+```yaml
+pipeline:
+ name: Sync MySQL Database to Doris
+ parallelism: 2
+ route-mode: FIRST_MATCH
+```
+
+{{< hint info >}}
+
+When using `FIRST_MATCH` mode, route rules are evaluated in the order they are
defined. The first rule that matches the source table will be applied, and
subsequent rules will be skipped.
+
+{{< /hint >}}
+
# Example
## Route one Data Source table to one Data Sink table
if synchronize the table `web_order` in the database `mydb` to a Doris table
`ods_web_order`, we can use this yaml file to define this route:
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index bbfafcf8e..8349c2977 100644
---
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -206,6 +206,15 @@ class YamlPipelineDefinitionParserTest {
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
}
+ @Test
+ void testRouteMode() throws Exception {
+ URL resource =
+
Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml");
+ YamlPipelineDefinitionParser parser = new
YamlPipelineDefinitionParser();
+ PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new
Configuration());
+ assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode);
+ }
+
@Test
void testSchemaEvolutionTypesConfiguration() throws Exception {
testSchemaEvolutionTypesParsing(
@@ -716,4 +725,60 @@ class YamlPipelineDefinitionParserTest {
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.build()));
+
+ private final PipelineDef pipelineDefWithRouteMode =
+ new PipelineDef(
+ new SourceDef(
+ "mysql",
+ null,
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("hostname", "localhost")
+ .put("port", "3306")
+ .put("username", "root")
+ .put("password", "123456")
+ .put("tables", "mydb.\\.*")
+ .put("server-id", "5400-5404")
+ .put("server-time-zone", "UTC")
+ .build())),
+ new SinkDef(
+ "doris",
+ null,
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("fenodes", "127.0.0.1:8030")
+ .put("username", "root")
+ .put("password", "")
+ .build()),
+ ImmutableSet.of(
+ DROP_COLUMN,
+ ALTER_COLUMN_TYPE,
+ ADD_COLUMN,
+ CREATE_TABLE,
+ RENAME_COLUMN)),
+ Arrays.asList(
+ new RouteDef(
+ "mydb.order_.*",
+ "ods_db.ods_orders",
+ null,
+ "Merge all order sharded tables"),
+ new RouteDef(
+ "mydb.product_.*",
+ "ods_db.ods_products",
+ null,
+ "Merge all product sharded tables"),
+ new RouteDef(
+ "mydb.*",
+ "ods_db.ods_<>",
+ "<>",
+ "One-to-one mapping for other tables")),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Configuration.fromMap(
+ ImmutableMap.<String, String>builder()
+ .put("name",
"mysql_to_doris_with_route_match_mode")
+ .put("parallelism", "2")
+ .put("route-mode", "FIRST_MATCH")
+ .build()));
}
diff --git
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml
new file mode 100644
index 000000000..94be52075
--- /dev/null
+++
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Example pipeline definition with route match-mode
+source:
+ type: mysql
+ hostname: localhost
+ port: 3306
+ username: root
+ password: 123456
+ tables: mydb.\.*
+ server-id: 5400-5404
+ server-time-zone: UTC
+
+sink:
+ type: doris
+ fenodes: 127.0.0.1:8030
+ username: root
+ password: ""
+
+route:
+ - source-table: mydb.order_.*
+ sink-table: ods_db.ods_orders
+ description: "Merge all order sharded tables"
+ - source-table: mydb.product_.*
+ sink-table: ods_db.ods_products
+ description: "Merge all product sharded tables"
+ - source-table: mydb.*
+ sink-table: ods_db.ods_<>
+ replace-symbol: <>
+ description: "One-to-one mapping for other tables"
+
+pipeline:
+ name: mysql_to_doris_with_route_match_mode
+ parallelism: 2
+ route-mode: FIRST_MATCH
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
index f27c37f7a..443d6145b 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
@@ -72,6 +72,22 @@ public class PipelineOptions {
"EXCEPTION: Throw
an exception to terminate the sync pipeline.")))
.build());
+ public static final ConfigOption<RouteMode> PIPELINE_ROUTE_MODE =
+ ConfigOptions.key("route-mode")
+ .enumType(RouteMode.class)
+ .defaultValue(RouteMode.ALL_MATCH)
+ .withDescription(
+ Description.builder()
+ .text("Match mode for routing rules. ")
+ .linebreak()
+ .add(
+ ListElement.list(
+ text(
+ "ALL_MATCH: Apply
all matching route rules to a table."),
+ text(
+ "FIRST_MATCH:
Apply only the first matching route rule and stop evaluation.")))
+ .build());
+
public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
ConfigOptions.key("local-time-zone")
.stringType()
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java
new file mode 100644
index 000000000..bd39b0006
--- /dev/null
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.pipeline;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+
+/** Route mode for routing rules. */
+@PublicEvolving
+public enum RouteMode {
+ /** Match all applicable routing rules. */
+ ALL_MATCH,
+ /** Match only the first applicable routing rule. */
+ FIRST_MATCH;
+}
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
index 06126b88a..9c9d7d016 100755
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.common.route;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
@@ -34,7 +35,10 @@ import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -53,6 +57,7 @@ public class TableIdRouter {
private final List<Tuple3<Pattern, String, String>> routes;
private final LoadingCache<TableId, List<TableId>> routingCache;
+ private final RouteMode routeMode;
private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
@@ -106,6 +111,11 @@ public class TableIdRouter {
}
public TableIdRouter(List<RouteRule> routingRules) {
+ this(routingRules, RouteMode.ALL_MATCH);
+ }
+
+ public TableIdRouter(List<RouteRule> routingRules, RouteMode routeMode) {
+ this.routeMode = routeMode;
this.routes = new ArrayList<>();
for (RouteRule rule : routingRules) {
try {
@@ -139,11 +149,19 @@ public class TableIdRouter {
}
private List<TableId> calculateRoute(TableId sourceTableId) {
- List<TableId> routedTableIds =
- routes.stream()
- .filter(route -> matches(route.f0, sourceTableId))
- .map(route -> resolveReplacement(sourceTableId, route))
- .collect(Collectors.toList());
+ List<TableId> routedTableIds = new ArrayList<>();
+
+ for (Tuple3<Pattern, String, String> route : routes) {
+ if (matches(route.f0, sourceTableId)) {
+ routedTableIds.add(resolveReplacement(sourceTableId, route));
+
+ // If match mode is FIRST_MATCH, stop after the first match
+ if (routeMode == RouteMode.FIRST_MATCH) {
+ break;
+ }
+ }
+ }
+
if (routedTableIds.isEmpty()) {
routedTableIds.add(sourceTableId);
}
@@ -177,13 +195,36 @@ public class TableIdRouter {
if (routes.isEmpty()) {
return new ArrayList<>();
}
- return routes.stream()
- .map(
- route ->
- tableIdSet.stream()
- .filter(tableId -> matches(route.f0,
tableId))
- .collect(Collectors.toSet()))
- .collect(Collectors.toList());
+
+ if (routeMode == RouteMode.ALL_MATCH) {
+ return routes.stream()
+ .map(
+ route ->
+ tableIdSet.stream()
+ .filter(tableId ->
matches(route.f0, tableId))
+ .collect(Collectors.toSet()))
+ .collect(Collectors.toList());
+ } else if (routeMode == RouteMode.FIRST_MATCH) {
+ Map<TableId, Integer> matchingTableIds = new HashMap<>();
+ for (TableId tableId : tableIdSet) {
+ for (int i = 0; i < routes.size(); i++) {
+ if
(routes.get(i).f0.matcher(tableId.toString()).matches()) {
+ matchingTableIds.put(tableId, i);
+ break;
+ }
+ }
+ }
+ List<Set<TableId>> routeGroups = new ArrayList<>(routes.size());
+ for (int i = 0; i < routes.size(); i++) {
+ routeGroups.add(new HashSet<>());
+ }
+ for (Map.Entry<TableId, Integer> entry :
matchingTableIds.entrySet()) {
+ routeGroups.get(entry.getValue()).add(entry.getKey());
+ }
+ return routeGroups;
+ } else {
+ throw new IllegalArgumentException("Unexpected route mode: " +
routeMode);
+ }
}
private static boolean matches(Pattern pattern, TableId tableId) {
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
index 985613e13..de31732b2 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.definition;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.composer.PipelineComposer;
@@ -32,6 +33,7 @@ import java.util.TimeZone;
import static
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE;
import static
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
+import static
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_ROUTE_MODE;
/**
* Definition of a pipeline.
@@ -100,6 +102,10 @@ public class PipelineDef {
return routes;
}
+ public RouteMode getRouteMode() {
+ return config.get(PIPELINE_ROUTE_MODE);
+ }
+
public List<TransformDef> getTransforms() {
return transforms;
}
@@ -125,6 +131,8 @@ public class PipelineDef {
+ sink
+ ", routes="
+ routes
+ + ", routeMode="
+ + getRouteMode()
+ ", transforms="
+ transforms
+ ", udfs="
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index 6de9045af..1854b0503 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -220,7 +220,8 @@ public class FlinkPipelineComposer implements
PipelineComposer {
pipelineDef
.getSink()
.getIncludedSchemaEvolutionTypes()),
- pipelineDef.getRoute());
+ pipelineDef.getRoute(),
+ pipelineDef.getRouteMode());
} else {
// Translate a regular topology for sources without distributed
tables
@@ -235,7 +236,8 @@ public class FlinkPipelineComposer implements
PipelineComposer {
pipelineDef
.getSink()
.getIncludedSchemaEvolutionTypes()),
- pipelineDef.getRoute());
+ pipelineDef.getRoute(),
+ pipelineDef.getRouteMode());
// Schema Operator ---(shuffled)---> Partitioning
stream =
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
index 8f31803b3..7340b3e80 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
@@ -18,7 +18,9 @@
package org.apache.flink.cdc.composer.flink.translator;
import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -55,29 +57,44 @@ public class SchemaOperatorTranslator {
this.timezone = timezone;
}
+ @VisibleForTesting
public DataStream<Event> translateRegular(
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
- return translateRegular(input, parallelism, false, metadataApplier,
routes);
+ return translateRegular(
+ input, parallelism, false, metadataApplier, routes,
RouteMode.ALL_MATCH);
}
+ @VisibleForTesting
public DataStream<Event> translateRegular(
DataStream<Event> input,
int parallelism,
boolean isBatchMode,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
+ return translateRegular(
+ input, parallelism, isBatchMode, metadataApplier, routes,
RouteMode.ALL_MATCH);
+ }
+
+ public DataStream<Event> translateRegular(
+ DataStream<Event> input,
+ int parallelism,
+ boolean isBatchMode,
+ MetadataApplier metadataApplier,
+ List<RouteDef> routes,
+ RouteMode routeMode) {
return isBatchMode
? addRegularSchemaBatchOperator(
- input, parallelism, metadataApplier, routes, timezone)
+ input, parallelism, metadataApplier, routes,
routeMode, timezone)
: addRegularSchemaOperator(
input,
parallelism,
metadataApplier,
routes,
+ routeMode,
schemaChangeBehavior,
timezone);
}
@@ -86,9 +103,16 @@ public class SchemaOperatorTranslator {
DataStream<PartitioningEvent> input,
int parallelism,
MetadataApplier metadataApplier,
- List<RouteDef> routes) {
+ List<RouteDef> routes,
+ RouteMode routeMode) {
return addDistributedSchemaOperator(
- input, parallelism, metadataApplier, routes,
schemaChangeBehavior, timezone);
+ input,
+ parallelism,
+ metadataApplier,
+ routes,
+ routeMode,
+ schemaChangeBehavior,
+ timezone);
}
@Deprecated
@@ -101,6 +125,7 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
List<RouteRule> routingRules = new ArrayList<>();
@@ -118,6 +143,7 @@ public class SchemaOperatorTranslator {
new SchemaOperatorFactory(
metadataApplier,
routingRules,
+ routeMode,
rpcTimeOut,
schemaChangeBehavior,
timezone));
@@ -130,6 +156,7 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes,
+ RouteMode routeMode,
String timezone) {
List<RouteRule> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
@@ -143,7 +170,8 @@ public class SchemaOperatorTranslator {
input.transform(
"SchemaBatchOperator",
new EventTypeInfo(),
- new BatchSchemaOperator(routingRules, metadataApplier,
timezone));
+ new BatchSchemaOperator(
+ routingRules, routeMode, metadataApplier,
timezone));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}
@@ -153,6 +181,7 @@ public class SchemaOperatorTranslator {
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
Preconditions.checkArgument(
@@ -181,6 +210,7 @@ public class SchemaOperatorTranslator {
.SchemaOperatorFactory(
metadataApplier,
routingRules,
+ routeMode,
rpcTimeOut,
schemaChangeBehavior,
timezone))
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index d01d987d4..7a01193c3 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@@ -1588,6 +1589,146 @@ class FlinkPipelineComposerITCase {
return events;
}
+ @ParameterizedTest
+ @EnumSource
+ void testRouteModeFirstMatch(ValuesDataSink.SinkApi sinkApi) throws
Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ TableId routedTable1 = TableId.tableId("default_namespace",
"default_schema", "routed1");
+ TableId routedTable2 = TableId.tableId("default_namespace",
"default_schema", "routed2");
+ TableId routedAll = TableId.tableId("default_namespace",
"default_schema", "routed_all");
+ List<RouteDef> routeDef =
+ Arrays.asList(
+ new RouteDef(TABLE_1.toString(),
routedTable1.toString(), null, null),
+ new RouteDef(TABLE_2.toString(),
routedTable2.toString(), null, null),
+ new RouteDef(
+ "default_namespace.default_schema.table\\.*",
+ routedAll.toString(),
+ null,
+ null));
+
+ // Setup pipeline with first-match route mode
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ pipelineConfig.set(PipelineOptions.PIPELINE_ROUTE_MODE,
RouteMode.FIRST_MATCH);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ routeDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ // With first-match mode, tables should only be routed to their first
matching route
+ List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
+ assertThat(routed1Results)
+ .contains(
+
"default_namespace.default_schema.routed1:col1=2;newCol3=x",
+
"default_namespace.default_schema.routed1:col1=3;newCol3=");
+ List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
+ assertThat(routed2Results)
+ .contains(
+
"default_namespace.default_schema.routed2:col1=1;col2=1",
+
"default_namespace.default_schema.routed2:col1=2;col2=2",
+
"default_namespace.default_schema.routed2:col1=3;col2=3");
+
+ List<String> allResults = ValuesDatabase.getAllResults();
+ assertThat(allResults).noneMatch(result ->
result.startsWith(routedAll.toString()));
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void testRouteModeAllMatch(ValuesDataSink.SinkApi sinkApi) throws
Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ TableId routedTable1 = TableId.tableId("default_namespace",
"default_schema", "routed1");
+ TableId routedTable2 = TableId.tableId("default_namespace",
"default_schema", "routed2");
+ TableId routedAll = TableId.tableId("default_namespace",
"default_schema", "routed_all");
+ List<RouteDef> routeDef =
+ Arrays.asList(
+ new RouteDef(TABLE_1.toString(),
routedTable1.toString(), null, null),
+ new RouteDef(TABLE_2.toString(),
routedTable2.toString(), null, null),
+ new RouteDef(
+ "default_namespace.default_schema.table\\.*",
+ routedAll.toString(),
+ null,
+ null));
+
+ // Setup pipeline with all-match route mode
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ pipelineConfig.set(PipelineOptions.PIPELINE_ROUTE_MODE,
RouteMode.ALL_MATCH);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ routeDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ // With all-match mode, tables should be routed to all matching routes
+ List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
+ assertThat(routed1Results)
+ .contains(
+
"default_namespace.default_schema.routed1:col1=2;newCol3=x",
+
"default_namespace.default_schema.routed1:col1=3;newCol3=");
+ List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
+ assertThat(routed2Results)
+ .contains(
+
"default_namespace.default_schema.routed2:col1=1;col2=1",
+
"default_namespace.default_schema.routed2:col1=2;col2=2",
+
"default_namespace.default_schema.routed2:col1=3;col2=3");
+
+ List<String> routedAllResults = ValuesDatabase.getResults(routedAll);
+ assertThat(routedAllResults).isNotEmpty();
+ assertThat(routedAllResults.stream().filter(s ->
s.contains("routed_all")).count())
+ .isGreaterThan(0);
+ }
+
BinaryRecordData generate(Schema schema, Object... fields) {
return (new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index c9169388d..870ddfa72 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -1183,4 +1183,145 @@ class RouteE2eITCase extends PipelineTestEnvironment {
.toArray(String[]::new));
extremeRouteTestDatabase.dropDatabase();
}
+
+ @ParameterizedTest(name = "isFirstMatch: {0}")
+ @ValueSource(booleans = {true, false})
+ void testMultipleRouteWithRouteMode(boolean isFirstMatch) throws Exception
{
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "route:\n"
+ + " - source-table:
%s.(TABLEALPHA|TABLEBETA)\n"
+ + " sink-table: NEW_%s.ALPHABET\n"
+ + " - source-table:
%s.(TABLEBETA|TABLEGAMMA)\n"
+ + " sink-table: NEW_%s.BETAGAMM\n"
+ + "\n"
+ + "pipeline:\n"
+ + " route-mode: %s\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ routeTestDatabase.getDatabaseName(),
+ routeTestDatabase.getDatabaseName(),
+ routeTestDatabase.getDatabaseName(),
+ routeTestDatabase.getDatabaseName(),
+ routeTestDatabase.getDatabaseName(),
+ isFirstMatch ? "FIRST_MATCH" : "ALL_MATCH",
+ parallelism);
+ submitPipelineJob(pipelineJob);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ if (isFirstMatch) {
+ validateResult(
+ routeDbNameFormatter,
+ "CreateTableEvent{tableId=NEW_%s.ALPHABET,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ "CreateTableEvent{tableId=NEW_%s.BETAGAMM,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ "CreateTableEvent{tableId=%s.TABLEDELTA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1008, 8], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1009, 8.1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1010, 10], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2012, 12], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2013, 13], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2014, 14], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3015, Amber], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3016, Black], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3017, Cyan], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3018, Denim], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4019, Yosemite], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4020, El Capitan], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4021, Sierra], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4022, High Sierra], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4023, Mojave], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4024, Catalina], op=INSERT, meta=()}");
+
+ } else {
+ validateResult(
+ routeDbNameFormatter,
+ "CreateTableEvent{tableId=NEW_%s.ALPHABET,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ "CreateTableEvent{tableId=NEW_%s.BETAGAMM,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ "CreateTableEvent{tableId=%s.TABLEDELTA,
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID,
options=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1008, 8], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1009, 8.1], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1010, 10], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[1011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2012, 12], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2013, 13], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[2014, 14], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[2011, 11], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[2012, 12], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[2013, 13], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[2014, 14], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3015, Amber], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3016, Black], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3017, Cyan], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3018, Denim], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4019, Yosemite], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4020, El Capitan], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4021, Sierra], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4022, High Sierra], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4023, Mojave], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[4024, Catalina], op=INSERT, meta=()}");
+ }
+
+ generateIncrementalChanges();
+ if (isFirstMatch) {
+ validateResult(
+ routeDbNameFormatter,
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[3007, 7], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[2014,
14], after=[2014, 2014], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3019, Emerald], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[4024,
Catalina], after=[], op=DELETE, meta=()}");
+
+ } else {
+ validateResult(
+ routeDbNameFormatter,
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[3007, 7], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[2014,
14], after=[2014, 2014], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[2014,
14], after=[2014, 2014], op=UPDATE, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[3019, Emerald], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[4024,
Catalina], after=[], op=DELETE, meta=()}");
+ }
+
+ generateSchemaChanges();
+
+ if (isFirstMatch) {
+ validateResult(
+ routeDbNameFormatter,
+ "AddColumnEvent{tableId=NEW_%s.ALPHABET,
addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=AFTER,
existedColumnName=VERSION}]}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[10001, 12, Derrida], op=INSERT, meta=()}",
+ "AddColumnEvent{tableId=NEW_%s.ALPHABET,
addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17),
position=AFTER, existedColumnName=NAME}]}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[10002, null, null, 15], op=INSERT, meta=()}",
+ "RenameColumnEvent{tableId=NEW_%s.BETAGAMM,
nameMapping={VERSION=VERSION_EX}}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[10003, Fluorite], op=INSERT, meta=()}",
+ "DropColumnEvent{tableId=%s.TABLEDELTA,
droppedColumnNames=[VERSION]}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[10004], op=INSERT, meta=()}");
+ } else {
+ validateResult(
+ routeDbNameFormatter,
+ "AddColumnEvent{tableId=NEW_%s.ALPHABET,
addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=AFTER,
existedColumnName=VERSION}]}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[10001, 12, Derrida], op=INSERT, meta=()}",
+ "AddColumnEvent{tableId=NEW_%s.ALPHABET,
addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17),
position=AFTER, existedColumnName=NAME}]}",
+ "AddColumnEvent{tableId=NEW_%s.BETAGAMM,
addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17),
position=AFTER, existedColumnName=VERSION}]}",
+ "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[],
after=[10002, null, null, 15], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[10002, null, 15], op=INSERT, meta=()}",
+ "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM,
typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}",
+ "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[],
after=[10003, null, Fluorite], op=INSERT, meta=()}",
+ "DropColumnEvent{tableId=%s.TABLEDELTA,
droppedColumnNames=[VERSION]}",
+ "DataChangeEvent{tableId=%s.TABLEDELTA, before=[],
after=[10004], op=INSERT, meta=()}");
+ }
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
index 640fb765c..af0cd6f4c 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.operators.schema.common;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -87,6 +88,7 @@ public abstract class SchemaRegistry implements
OperatorCoordinator, Coordinatio
protected final MetadataApplier metadataApplier;
protected final Duration rpcTimeout;
protected final List<RouteRule> routingRules;
+ protected final RouteMode routeMode;
protected final SchemaChangeBehavior behavior;
// -------------------------
@@ -104,6 +106,7 @@ public abstract class SchemaRegistry implements
OperatorCoordinator, Coordinatio
ExecutorService coordinatorExecutor,
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
Duration rpcTimeout) {
this.context = context;
@@ -111,6 +114,7 @@ public abstract class SchemaRegistry implements
OperatorCoordinator, Coordinatio
this.coordinatorExecutor = coordinatorExecutor;
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.rpcTimeout = rpcTimeout;
this.behavior = schemaChangeBehavior;
}
@@ -127,7 +131,7 @@ public abstract class SchemaRegistry implements
OperatorCoordinator, Coordinatio
if (this.schemaManager == null) {
this.schemaManager = new SchemaManager();
}
- this.router = new TableIdRouter(routingRules);
+ this.router = new TableIdRouter(routingRules, routeMode);
}
@Override
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
index 8626eafbc..f9aad19b8 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
@@ -109,6 +110,7 @@ public class SchemaCoordinator extends SchemaRegistry {
ExecutorService coordinatorExecutor,
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
Duration rpcTimeout) {
super(
@@ -117,6 +119,7 @@ public class SchemaCoordinator extends SchemaRegistry {
coordinatorExecutor,
metadataApplier,
routingRules,
+ routeMode,
schemaChangeBehavior,
rpcTimeout);
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
index f028f4cc9..a5cf25a7f 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.operators.schema.distributed;
import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -39,6 +40,7 @@ public class SchemaCoordinatorProvider implements
OperatorCoordinator.Provider {
private final String operatorName;
private final MetadataApplier metadataApplier;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
private final SchemaChangeBehavior schemaChangeBehavior;
private final Duration rpcTimeout;
@@ -47,12 +49,14 @@ public class SchemaCoordinatorProvider implements
OperatorCoordinator.Provider {
String operatorName,
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
Duration rpcTimeout) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.schemaChangeBehavior = schemaChangeBehavior;
this.rpcTimeout = rpcTimeout;
}
@@ -75,6 +79,7 @@ public class SchemaCoordinatorProvider implements
OperatorCoordinator.Provider {
coordinatorExecutor,
metadataApplier,
routingRules,
+ routeMode,
schemaChangeBehavior,
rpcTimeout);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 9ec2b2fc9..7964efd3f 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -71,13 +72,16 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
private final String timezone;
private final SchemaChangeBehavior schemaChangeBehavior;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
public SchemaOperator(
List<RouteRule> routingRules,
+ RouteMode routeMode,
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.rpcTimeOut = rpcTimeOut;
this.schemaChangeBehavior = schemaChangeBehavior;
this.timezone = timezone;
@@ -100,7 +104,7 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
upstreamSchemaTable = HashBasedTable.create();
evolvedSchemaMap = new HashMap<>();
- tableIdRouter = new TableIdRouter(routingRules);
+ tableIdRouter = new TableIdRouter(routingRules, routeMode);
derivator = new SchemaDerivator();
this.schemaOperatorMetrics =
new SchemaOperatorMetrics(
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
index 52378c15f..5652f28ad 100755
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
@@ -19,6 +19,7 @@ package
org.apache.flink.cdc.runtime.operators.schema.distributed;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -41,18 +42,23 @@ public class SchemaOperatorFactory extends
SimpleOperatorFactory<Event>
private final MetadataApplier metadataApplier;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
private final SchemaChangeBehavior schemaChangeBehavior;
private final Duration rpcTimeout;
public SchemaOperatorFactory(
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
+ RouteMode routeMode,
Duration rpcTimeout,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
- super(new SchemaOperator(routingRules, rpcTimeout,
schemaChangeBehavior, timezone));
+ super(
+ new SchemaOperator(
+ routingRules, routeMode, rpcTimeout,
schemaChangeBehavior, timezone));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.schemaChangeBehavior = schemaChangeBehavior;
this.rpcTimeout = rpcTimeout;
}
@@ -65,6 +71,7 @@ public class SchemaOperatorFactory extends
SimpleOperatorFactory<Event>
operatorName,
metadataApplier,
routingRules,
+ routeMode,
schemaChangeBehavior,
rpcTimeout);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
index 8ed7d0916..e830b1f2a 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -58,6 +59,7 @@ public class BatchSchemaOperator extends
AbstractStreamOperatorAdapter<Event>
// Final fields that are set in constructor
private final String timezone;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
// Transient fields that are set during open()
private transient volatile Map<TableId, Schema> originalSchemaMap;
@@ -69,10 +71,14 @@ public class BatchSchemaOperator extends
AbstractStreamOperatorAdapter<Event>
private boolean alreadyMergedCreateTableTables = false;
public BatchSchemaOperator(
- List<RouteRule> routingRules, MetadataApplier metadataApplier,
String timezone) {
+ List<RouteRule> routingRules,
+ RouteMode routeMode,
+ MetadataApplier metadataApplier,
+ String timezone) {
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.timezone = timezone;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.metadataApplier = metadataApplier;
}
@@ -89,7 +95,7 @@ public class BatchSchemaOperator extends
AbstractStreamOperatorAdapter<Event>
super.open();
this.originalSchemaMap = new HashMap<>();
this.evolvedSchemaMap = new HashMap<>();
- this.router = new TableIdRouter(routingRules);
+ this.router = new TableIdRouter(routingRules, routeMode);
this.derivator = new SchemaDerivator();
this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
index 53344dc87..3a241aac4 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
@@ -100,6 +101,7 @@ public class SchemaCoordinator extends SchemaRegistry {
ExecutorService coordinatorExecutor,
MetadataApplier metadataApplier,
List<RouteRule> routes,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
Duration rpcTimeout) {
super(
@@ -108,6 +110,7 @@ public class SchemaCoordinator extends SchemaRegistry {
coordinatorExecutor,
metadataApplier,
routes,
+ routeMode,
schemaChangeBehavior,
rpcTimeout);
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
index 253b52cac..cedc9d37c 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.operators.schema.regular;
import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -39,6 +40,7 @@ public class SchemaCoordinatorProvider implements
OperatorCoordinator.Provider {
private final String operatorName;
private final MetadataApplier metadataApplier;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
private final SchemaChangeBehavior schemaChangeBehavior;
private final Duration rpcTimeout;
@@ -47,12 +49,14 @@ public class SchemaCoordinatorProvider implements
OperatorCoordinator.Provider {
String operatorName,
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
+ RouteMode routeMode,
SchemaChangeBehavior schemaChangeBehavior,
Duration rpcTimeout) {
this.operatorID = operatorID;
this.operatorName = operatorName;
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.schemaChangeBehavior = schemaChangeBehavior;
this.rpcTimeout = rpcTimeout;
}
@@ -75,6 +79,7 @@ public class SchemaCoordinatorProvider implements
OperatorCoordinator.Provider {
coordinatorExecutor,
metadataApplier,
routingRules,
+ routeMode,
schemaChangeBehavior,
rpcTimeout);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index 7dd9a210a..b659b7234 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -76,6 +77,7 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
private final Duration rpcTimeout;
private final SchemaChangeBehavior schemaChangeBehavior;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
// Transient fields that are set during open()
private transient int subTaskId;
@@ -88,24 +90,26 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
@VisibleForTesting
public SchemaOperator(List<RouteRule> routingRules) {
- this(routingRules, DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+ this(routingRules, RouteMode.ALL_MATCH,
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
}
@VisibleForTesting
- public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
- this(routingRules, rpcTimeOut, SchemaChangeBehavior.EVOLVE);
+ public SchemaOperator(List<RouteRule> routingRules, RouteMode routeMode,
Duration rpcTimeOut) {
+ this(routingRules, routeMode, rpcTimeOut, SchemaChangeBehavior.EVOLVE);
}
@VisibleForTesting
public SchemaOperator(
List<RouteRule> routingRules,
+ RouteMode routeMode,
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior) {
- this(routingRules, rpcTimeOut, schemaChangeBehavior, "UTC");
+ this(routingRules, routeMode, rpcTimeOut, schemaChangeBehavior, "UTC");
}
public SchemaOperator(
List<RouteRule> routingRules,
+ RouteMode routeMode,
Duration rpcTimeOut,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
@@ -114,6 +118,7 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
this.schemaChangeBehavior = schemaChangeBehavior;
this.timezone = timezone;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
}
@Override
@@ -134,7 +139,7 @@ public class SchemaOperator extends
AbstractStreamOperatorAdapter<Event>
this.subTaskId =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
this.originalSchemaMap = new HashMap<>();
this.evolvedSchemaMap = new HashMap<>();
- this.router = new TableIdRouter(routingRules);
+ this.router = new TableIdRouter(routingRules, routeMode);
this.derivator = new SchemaDerivator();
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
index 630acc904..6afc3a138 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.operators.schema.regular;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -40,18 +41,23 @@ public class SchemaOperatorFactory extends
SimpleOperatorFactory<Event>
private final MetadataApplier metadataApplier;
private final List<RouteRule> routingRules;
+ private final RouteMode routeMode;
private final SchemaChangeBehavior schemaChangeBehavior;
private final Duration rpcTimeout;
public SchemaOperatorFactory(
MetadataApplier metadataApplier,
List<RouteRule> routingRules,
+ RouteMode routeMode,
Duration rpcTimeout,
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
- super(new SchemaOperator(routingRules, rpcTimeout,
schemaChangeBehavior, timezone));
+ super(
+ new SchemaOperator(
+ routingRules, routeMode, rpcTimeout,
schemaChangeBehavior, timezone));
this.metadataApplier = metadataApplier;
this.routingRules = routingRules;
+ this.routeMode = routeMode;
this.schemaChangeBehavior = schemaChangeBehavior;
this.rpcTimeout = rpcTimeout;
}
@@ -64,6 +70,7 @@ public class SchemaOperatorFactory extends
SimpleOperatorFactory<Event>
operatorName,
metadataApplier,
routingRules,
+ routeMode,
schemaChangeBehavior,
rpcTimeout);
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
index b8e487700..abfc475df 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.common.route;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase;
import org.junit.jupiter.api.Test;
@@ -166,7 +167,9 @@ public class TableIdRouterTest extends SchemaTestBase {
private static List<String> testStdRegExpRoute(
String sourceRouteRule, String sinkRouteRule, List<String>
sourceTables) {
TableIdRouter router =
- new TableIdRouter(List.of(new RouteRule(sourceRouteRule,
sinkRouteRule)));
+ new TableIdRouter(
+ List.of(new RouteRule(sourceRouteRule, sinkRouteRule)),
+ RouteMode.ALL_MATCH);
return sourceTables.stream()
.map(TableId::parse)
.map(router::route)
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
index 40b40b468..55d612b86 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -615,7 +616,8 @@ public class SchemaDerivatorTest extends SchemaTestBase {
new RouteRule("db_3.table_\\.*",
"db_3.table_merged"),
// Broadcast tables
new RouteRule("db_4.table_1", "db_4.table_a"),
- new RouteRule("db_4.table_1",
"db_4.table_b")));
+ new RouteRule("db_4.table_1", "db_4.table_b")),
+ RouteMode.ALL_MATCH);
List<CreateTableEvent> createTableEvents =
Arrays.asList(
new CreateTableEvent(
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
index af273d13d..42c78b030 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
@@ -22,6 +22,7 @@ import
org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.route.TableIdRouter;
import org.apache.flink.cdc.common.types.DataType;
@@ -72,7 +73,8 @@ public abstract class SchemaTestBase {
null),
new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null));
- protected static final TableIdRouter TABLE_ID_ROUTER = new
TableIdRouter(ROUTING_RULES);
+ protected static final TableIdRouter TABLE_ID_ROUTER =
+ new TableIdRouter(ROUTING_RULES, RouteMode.ALL_MATCH);
protected static BinaryRecordData genBinRec(String rowType, Object...
fields) {
return (new BinaryRecordDataGenerator(quickGenRow(rowType).toArray(new
DataType[0])))
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java
new file mode 100644
index 000000000..14555947e
--- /dev/null
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.common;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+/** Unit test for {@link TableIdRouter} with match-mode support. */
+public class TableIdRouterMatchModeTest {
+
+ @Test
+ void testFirstMatchMode() {
+ // Setup routing rules for first-match mode
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ // Sharded tables should be merged
+ new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+ new RouteRule("mydb.product_\\.*",
"ods_db.ods_products"),
+ // Catch-all rule for one-to-one mapping
+ new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.FIRST_MATCH);
+
+ // Test sharded order tables - should match first rule and stop
+ assertThat(route(router,
"mydb.order_1")).containsExactly("ods_db.ods_orders");
+ assertThat(route(router,
"mydb.order_2")).containsExactly("ods_db.ods_orders");
+ assertThat(route(router,
"mydb.order_100")).containsExactly("ods_db.ods_orders");
+
+ // Test sharded product tables - should match second rule and stop
+ assertThat(route(router,
"mydb.product_1")).containsExactly("ods_db.ods_products");
+ assertThat(route(router,
"mydb.product_2")).containsExactly("ods_db.ods_products");
+
+ // Test non-sharded tables - should match third rule (catch-all)
+ assertThat(route(router,
"mydb.user")).containsExactly("ods_db.ods_user");
+ assertThat(route(router,
"mydb.customer")).containsExactly("ods_db.ods_customer");
+ assertThat(route(router,
"mydb.config")).containsExactly("ods_db.ods_config");
+ }
+
+ @Test
+ void testAllMatchMode() {
+ // Setup routing rules for all-match mode (default behavior)
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+ new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.ALL_MATCH);
+
+ // Test sharded order tables - should match BOTH rules
+ assertThat(route(router, "mydb.order_1"))
+ .containsExactlyInAnyOrder("ods_db.ods_orders",
"ods_db.ods_order_1");
+ assertThat(route(router, "mydb.order_2"))
+ .containsExactlyInAnyOrder("ods_db.ods_orders",
"ods_db.ods_order_2");
+
+ // Test non-sharded tables - should match only second rule
+ assertThat(route(router,
"mydb.user")).containsExactly("ods_db.ods_user");
+ }
+
+ @Test
+ void testFirstMatchWithNoMatchingRules() {
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+ new RouteRule("mydb.product_\\.*",
"ods_db.ods_products"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.FIRST_MATCH);
+
+ // Table that doesn't match any rule should route to itself (implicit
routing)
+ assertThat(route(router,
"otherdb.user")).containsExactly("otherdb.user");
+ }
+
+ @Test
+ void testAllMatchWithMultipleMatchingRules() {
+ // Setup multiple overlapping rules
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ new RouteRule("db.table_\\.*", "db.merged_1"),
+ new RouteRule("db.table_\\.*", "db.merged_2"),
+ new RouteRule("db.table_\\.*", "db.merged_3"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.ALL_MATCH);
+
+ // Should match all three rules
+ assertThat(route(router, "db.table_1"))
+ .containsExactlyInAnyOrder("db.merged_1", "db.merged_2",
"db.merged_3");
+ }
+
+ private static List<String> route(TableIdRouter router, String tableId) {
+ return router.route(TableId.parse(tableId)).stream()
+ .map(TableId::toString)
+ .collect(Collectors.toList());
+ }
+
+ @Test
+ void testGroupSourceTablesByRouteRuleFirstMatch() {
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+ new RouteRule("mydb.product_\\.*",
"ods_db.ods_products"),
+ new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.FIRST_MATCH);
+
+ Set<TableId> tableIdSet =
+ new HashSet<>(
+ Arrays.asList(
+ TableId.parse("mydb.order_1"),
+ TableId.parse("mydb.order_2"),
+ TableId.parse("mydb.order_100"),
+ TableId.parse("mydb.product_1"),
+ TableId.parse("mydb.product_2"),
+ TableId.parse("mydb.user"),
+ TableId.parse("mydb.customer"),
+ TableId.parse("mydb.config")));
+
+ List<Set<TableId>> groups =
router.groupSourceTablesByRouteRule(tableIdSet);
+
+ assertThat(groups).hasSize(3);
+
+ assertThat(groups.get(0))
+ .containsExactlyInAnyOrder(
+ TableId.parse("mydb.order_1"),
+ TableId.parse("mydb.order_2"),
+ TableId.parse("mydb.order_100"));
+
+ assertThat(groups.get(1))
+ .containsExactlyInAnyOrder(
+ TableId.parse("mydb.product_1"),
TableId.parse("mydb.product_2"));
+
+ assertThat(groups.get(2))
+ .containsExactlyInAnyOrder(
+ TableId.parse("mydb.user"),
+ TableId.parse("mydb.customer"),
+ TableId.parse("mydb.config"));
+ }
+
+ @Test
+ void testGroupSourceTablesByRouteRuleFirstMatchWithUnmatchedTables() {
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+ new RouteRule("mydb.product_\\.*",
"ods_db.ods_products"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.FIRST_MATCH);
+
+ Set<TableId> tableIdSet =
+ new HashSet<>(
+ Arrays.asList(
+ TableId.parse("mydb.order_1"),
+ TableId.parse("mydb.order_2"),
+ TableId.parse("mydb.product_1"),
+ TableId.parse("otherdb.user"),
+ TableId.parse("otherdb.customer")));
+
+ List<Set<TableId>> groups =
router.groupSourceTablesByRouteRule(tableIdSet);
+
+ assertThat(groups).hasSize(2);
+
+ // First group: order tables
+ assertThat(groups.get(0))
+ .containsExactlyInAnyOrder(
+ TableId.parse("mydb.order_1"),
TableId.parse("mydb.order_2"));
+
+ // Second group: product tables
+
assertThat(groups.get(1)).containsExactlyInAnyOrder(TableId.parse("mydb.product_1"));
+
+ // Unmatched tables (otherdb.user, otherdb.customer) are not in any
group
+
assertThat(groups.stream().flatMap(Set::stream).collect(Collectors.toList()))
+ .doesNotContain(TableId.parse("otherdb.user"),
TableId.parse("otherdb.customer"));
+ }
+
+ @Test
+ void testGroupSourceTablesByRouteRuleFirstMatchWithOverlappingRules() {
+ List<RouteRule> routingRules =
+ Arrays.asList(
+ new RouteRule("db.table_\\.*", "db.merged_1"),
+ new RouteRule("db.table_\\.*", "db.merged_2"),
+ new RouteRule("db.table_\\.*", "db.merged_3"));
+
+ TableIdRouter router = new TableIdRouter(routingRules,
RouteMode.FIRST_MATCH);
+
+ Set<TableId> tableIdSet =
+ new HashSet<>(
+ Arrays.asList(
+ TableId.parse("db.table_1"),
+ TableId.parse("db.table_2"),
+ TableId.parse("db.table_3")));
+
+ List<Set<TableId>> groups =
router.groupSourceTablesByRouteRule(tableIdSet);
+
+ assertThat(groups).hasSize(3);
+
+ // All tables should match only the first rule (FIRST_MATCH mode)
+ assertThat(groups.get(0))
+ .containsExactlyInAnyOrder(
+ TableId.parse("db.table_1"),
+ TableId.parse("db.table_2"),
+ TableId.parse("db.table_3"));
+
+ // Second and third groups should be empty
+ assertThat(groups.get(1)).isEmpty();
+ assertThat(groups.get(2)).isEmpty();
+ }
+}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
index 7ece5d20c..4245887f2 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@@ -107,6 +108,7 @@ public class SchemaEvolveTest extends SchemaTestBase {
() ->
new SchemaOperator(
ROUTING_RULES,
+ RouteMode.ALL_MATCH,
Duration.ofMinutes(3),
SchemaChangeBehavior.LENIENT,
"UTC"),
@@ -247,6 +249,7 @@ public class SchemaEvolveTest extends SchemaTestBase {
() ->
new SchemaOperator(
ROUTING_RULES,
+ RouteMode.ALL_MATCH,
Duration.ofMinutes(3),
SchemaChangeBehavior.IGNORE,
"UTC"),
@@ -349,6 +352,7 @@ public class SchemaEvolveTest extends SchemaTestBase {
() ->
new SchemaOperator(
ROUTING_RULES,
+ RouteMode.ALL_MATCH,
Duration.ofMinutes(3),
SchemaChangeBehavior.EXCEPTION,
"UTC"),
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
index 740c77bd4..7bdf37ed4 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@@ -87,7 +88,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndBehavior(
schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -365,7 +367,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndBehavior(
schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -643,7 +646,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndBehavior(
schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -737,7 +741,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndBehavior(
schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -1033,7 +1038,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError(
schemaOperator,
@@ -1131,7 +1137,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
// All types of schema change events will be sent to the sink
// AddColumn and RenameColumn events will always fail
@@ -1461,7 +1468,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
// All types of schema change events will be sent to the sink
// AddColumn and RenameColumn events will always fail
@@ -1795,7 +1803,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndBehavior(
schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -2196,7 +2205,8 @@ class SchemaEvolveTest {
SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30),
behavior);
+ new SchemaOperator(
+ new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30), behavior);
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDurationAndBehavior(
schemaOperator, 17, Duration.ofSeconds(3), behavior);
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
index bd7ed4da0..b547a820b 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
@@ -122,7 +123,7 @@ class SchemaOperatorTest {
@Test
void testProcessSchemaChangeEventWithTimeOut() throws Exception {
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1));
+ new SchemaOperator(new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(1));
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDuration(
schemaOperator, 1, Duration.ofSeconds(3));
@@ -141,7 +142,7 @@ class SchemaOperatorTest {
@Test
void testProcessSchemaChangeEventWithOutTimeOut() throws Exception {
SchemaOperator schemaOperator =
- new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30));
+ new SchemaOperator(new ArrayList<>(), RouteMode.ALL_MATCH,
Duration.ofSeconds(30));
RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
RegularEventOperatorTestHarness.withDuration(
schemaOperator, 1, Duration.ofSeconds(3));
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
index cbc1f407d..3e81ec82b 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.testutils.operators;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
@@ -99,6 +100,7 @@ public class DistributedEventOperatorTestHarness<
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(applyDuration),
new ArrayList<>(),
+ RouteMode.ALL_MATCH,
SchemaChangeBehavior.LENIENT,
rpcTimeout);
this.schemaRegistryGateway = new
TestingSchemaRegistryGateway(schemaCoordinator);
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
index 8a3a21ba6..9b2eb2ee4 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
@@ -24,6 +24,7 @@ import
org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
@@ -124,6 +125,7 @@ public class RegularEventOperatorTestHarness<
new CollectingMetadataApplier(
schemaEvolveDuration, enabledEventTypes,
errorsOnEventTypes),
new ArrayList<>(),
+ RouteMode.ALL_MATCH,
behavior,
rpcTimeout);
schemaRegistryGateway = new
TestingSchemaRegistryGateway(schemaRegistry);