This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ffab779 [FLINK-38831][runtime] Route configuration support 
FIRST_MATCH mode (#4212)
c9ffab779 is described below

commit c9ffab7792a443478cca3b4fff985354b2520ca4
Author: SkylerLin <[email protected]>
AuthorDate: Tue Mar 17 19:32:13 2026 +0800

    [FLINK-38831][runtime] Route configuration support FIRST_MATCH mode (#4212)
---
 README.md                                          |   1 +
 docs/content.zh/docs/core-concept/data-pipeline.md |   1 +
 docs/content.zh/docs/core-concept/route.md         |  23 ++
 docs/content/docs/core-concept/data-pipeline.md    |   1 +
 docs/content/docs/core-concept/route.md            |  25 ++-
 .../parser/YamlPipelineDefinitionParserTest.java   |  65 ++++++
 .../pipeline-definition-with-route-mode.yaml       |  48 +++++
 .../flink/cdc/common/pipeline/PipelineOptions.java |  16 ++
 .../flink/cdc/common/pipeline/RouteMode.java       |  29 +++
 .../flink/cdc/common/route/TableIdRouter.java      |  65 ++++--
 .../flink/cdc/composer/definition/PipelineDef.java |   8 +
 .../cdc/composer/flink/FlinkPipelineComposer.java  |   6 +-
 .../flink/translator/SchemaOperatorTranslator.java |  40 +++-
 .../flink/FlinkPipelineComposerITCase.java         | 141 +++++++++++++
 .../flink/cdc/pipeline/tests/RouteE2eITCase.java   | 141 +++++++++++++
 .../operators/schema/common/SchemaRegistry.java    |   6 +-
 .../schema/distributed/SchemaCoordinator.java      |   3 +
 .../distributed/SchemaCoordinatorProvider.java     |   5 +
 .../schema/distributed/SchemaOperator.java         |   6 +-
 .../schema/distributed/SchemaOperatorFactory.java  |   9 +-
 .../schema/regular/BatchSchemaOperator.java        |  10 +-
 .../schema/regular/SchemaCoordinator.java          |   3 +
 .../schema/regular/SchemaCoordinatorProvider.java  |   5 +
 .../operators/schema/regular/SchemaOperator.java   |  15 +-
 .../schema/regular/SchemaOperatorFactory.java      |   9 +-
 .../flink/cdc/common/route/TableIdRouterTest.java  |   5 +-
 .../schema/common/SchemaDerivatorTest.java         |   4 +-
 .../operators/schema/common/SchemaTestBase.java    |   4 +-
 .../schema/common/TableIdRouterMatchModeTest.java  | 231 +++++++++++++++++++++
 .../schema/distributed/SchemaEvolveTest.java       |   4 +
 .../operators/schema/regular/SchemaEvolveTest.java |  28 ++-
 .../schema/regular/SchemaOperatorTest.java         |   5 +-
 .../DistributedEventOperatorTestHarness.java       |   2 +
 .../operators/RegularEventOperatorTestHarness.java |   2 +
 34 files changed, 921 insertions(+), 45 deletions(-)

diff --git a/README.md b/README.md
index ab17cfa66..8b818a923 100644
--- a/README.md
+++ b/README.md
@@ -100,6 +100,7 @@ SELECT * FROM test_table;
    pipeline:
      name: Sync MySQL Database to Doris
      parallelism: 2
+     route-mode: ALL_MATCH
      user-defined-function:
        - name: addone
          classpath: com.example.functions.AddOneFunctionClass
diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md 
b/docs/content.zh/docs/core-concept/data-pipeline.md
index a2edc0854..6bde6e462 100644
--- a/docs/content.zh/docs/core-concept/data-pipeline.md
+++ b/docs/content.zh/docs/core-concept/data-pipeline.md
@@ -118,6 +118,7 @@ under the License.
 | `parallelism`                 | pipeline的全局并发度,默认值是1。                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | `local-time-zone`             | 作业级别的本地时区。                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | `execution.runtime-mode`      | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 
STREAMING。                                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| `route-mode`                  | [路由]({{< ref "docs/core-concept/route" 
>}}#路由模式)规则的匹配模式。可选值:`ALL_MATCH`(默认,应用所有匹配的规则)或 `FIRST_MATCH`(只应用第一个匹配的规则)。     
                                                                                
                                                                                
                                                                                
                                                               | optional       
   |
 | `schema.change.behavior`      | 如何处理 [schema 变更]({{< ref 
"docs/core-concept/schema-evolution" >}})。可选值:[`exception`]({{< ref 
"docs/core-concept/schema-evolution" >}}#exception-mode)、[`evolve`]({{< ref 
"docs/core-concept/schema-evolution" >}}#evolve-mode)、[`try_evolve`]({{< ref 
"docs/core-concept/schema-evolution" >}}#tryevolve-mode)、[`lenient`]({{< ref 
"docs/core-concept/schema-evolution" >}}#lenient-mode)(默认值)或 [`ignore`]({{< ref 
"docs/core-concept/schema-evolution" >}}#ignore-mode)。  [...]
 | `schema.operator.uid`         | Schema 算子的唯一 ID。此 ID 
用于算子间通信,必须在所有算子中保持唯一。**已废弃**:请使用 `operator.uid.prefix` 代替。                      
                                                                                
                                                                                
                                                                                
                                                                                
                                      [...]
 | `schema-operator.rpc-timeout` | SchemaOperator 等待下游 SchemaChangeEvent 
应用完成的超时时间,默认值是 3 分钟。                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                     [...]
diff --git a/docs/content.zh/docs/core-concept/route.md 
b/docs/content.zh/docs/core-concept/route.md
index 774915d3d..88428f853 100644
--- a/docs/content.zh/docs/core-concept/route.md
+++ b/docs/content.zh/docs/core-concept/route.md
@@ -39,6 +39,29 @@ under the License.
 
 一个 Route 模块可以包含一个或多个 source-table/sink-table 规则。
 
+# 路由模式
+默认情况下,所有匹配的路由规则都会被应用到表上。你可以在 pipeline 配置中通过 `route-mode` 选项来改变这一行为:
+
+| 值            | 描述                                              |
+|--------------|-------------------------------------------------|
+| `ALL_MATCH`  | 应用所有匹配的路由规则到表上。这是默认模式。                          |
+| `FIRST_MATCH`| 只应用第一个匹配的路由规则,并停止后续规则的计算。                       |
+
+例如,使用 `FIRST_MATCH` 模式:
+
+```yaml
+pipeline:
+  name: Sync MySQL Database to Doris
+  parallelism: 2
+  route-mode: FIRST_MATCH
+```
+
+{{< hint info >}}
+
+当使用 `FIRST_MATCH` 模式时,路由规则会按照定义的顺序进行计算。第一个匹配源表的规则会被应用,后续的规则将被跳过。
+
+{{< /hint >}}
+
 # 示例
 ## 路由一个 Data Source 表到一个 Data Sink 表
 如果同步一个 `mydb` 数据库中的 `web_order` 表到一个相同库的 `ods_web_order` 表,我们可以使用下面的 yaml 
文件来定义这个路由:
diff --git a/docs/content/docs/core-concept/data-pipeline.md 
b/docs/content/docs/core-concept/data-pipeline.md
index e8ab92624..2a9a27b50 100644
--- a/docs/content/docs/core-concept/data-pipeline.md
+++ b/docs/content/docs/core-concept/data-pipeline.md
@@ -120,6 +120,7 @@ Note that whilst the parameters are each individually 
optional, at least one of
 | `parallelism`                 | The global parallelism of the pipeline. 
Defaults to 1.                                                                  
                                                                                
                                                                                
                                                                                
                                                                                
                   [...]
 | `local-time-zone`             | The local time zone defines current session 
time zone id.                                                                   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | `execution.runtime-mode`      | The runtime mode of the pipeline includes 
STREAMING and BATCH, with the default value being STREAMING.                    
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| `route-mode`                  | The matching mode for [route]({{< ref 
"docs/core-concept/route" >}}#route-mode) rules. One of: `ALL_MATCH` (default, 
apply all matching rules) or `FIRST_MATCH` (apply only the first matching 
rule).                                                                          
                                                                                
                                                                                
                            [...]
 | `schema.change.behavior`      | How to handle [changes in schema]({{< ref 
"docs/core-concept/schema-evolution" >}}). One of: [`exception`]({{< ref 
"docs/core-concept/schema-evolution" >}}#exception-mode), [`evolve`]({{< ref 
"docs/core-concept/schema-evolution" >}}#evolve-mode), [`try_evolve`]({{< ref 
"docs/core-concept/schema-evolution" >}}#tryevolve-mode), [`lenient`]({{< ref 
"docs/core-concept/schema-evolution" >}}#lenient-mode) (default) or 
[`ignore`]({{< ref "docs/core-concept/sche [...]
 | `schema.operator.uid`         | The unique ID for schema operator. This ID 
will be used for inter-operator communications and must be unique across 
operators. **Deprecated**: use `operator.uid.prefix` instead.                   
                                                                                
                                                                                
                                                                                
                       [...]
 | `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait 
downstream SchemaChangeEvent applying finished, the default value is 3 minutes. 
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
diff --git a/docs/content/docs/core-concept/route.md 
b/docs/content/docs/core-concept/route.md
index 394f5a56a..2c1de4435 100644
--- a/docs/content/docs/core-concept/route.md
+++ b/docs/content/docs/core-concept/route.md
@@ -28,7 +28,7 @@ under the License.
 **Route** specifies the rule of matching a list of source-table and mapping to 
sink-table. The most typical scenario is the merge of sub-databases and 
sub-tables, routing multiple upstream source tables to the same sink table.
 
 # Parameters
-To describe a route, the follows are required:  
+To describe a route, the follows are required:
 
 | parameter      | meaning                                                     
                                | optional/required |
 
|----------------|---------------------------------------------------------------------------------------------|-------------------|
@@ -39,6 +39,29 @@ To describe a route, the follows are required:
 
 A route module can contain a list of source-table/sink-table rules.
 
+# Route Mode
+By default, all matching route rules are applied to a table. You can configure 
the `route-mode` option in the pipeline configuration to change this behavior:
+
+| Value        | Description                                                   
         |
+|--------------|------------------------------------------------------------------------|
+| `ALL_MATCH`  | Apply all matching route rules to a table. This is the 
default mode.  |
+| `FIRST_MATCH`| Apply only the first matching route rule and stop evaluation. 
         |
+
+For example, to use `FIRST_MATCH` mode:
+
+```yaml
+pipeline:
+  name: Sync MySQL Database to Doris
+  parallelism: 2
+  route-mode: FIRST_MATCH
+```
+
+{{< hint info >}}
+
+When using `FIRST_MATCH` mode, route rules are evaluated in the order they are 
defined. The first rule that matches the source table will be applied, and 
subsequent rules will be skipped.
+
+{{< /hint >}}
+
 # Example
 ## Route one Data Source table to one Data Sink table
 if synchronize the table `web_order` in the database `mydb` to a Doris table 
`ods_web_order`, we can use this yaml file to define this route:
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index bbfafcf8e..8349c2977 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -206,6 +206,15 @@ class YamlPipelineDefinitionParserTest {
         assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
     }
 
+    @Test
+    void testRouteMode() throws Exception {
+        URL resource =
+                
Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml");
+        YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new 
Configuration());
+        assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode);
+    }
+
     @Test
     void testSchemaEvolutionTypesConfiguration() throws Exception {
         testSchemaEvolutionTypesParsing(
@@ -716,4 +725,60 @@ class YamlPipelineDefinitionParserTest {
                             ImmutableMap.<String, String>builder()
                                     .put("parallelism", "1")
                                     .build()));
+
+    private final PipelineDef pipelineDefWithRouteMode =
+            new PipelineDef(
+                    new SourceDef(
+                            "mysql",
+                            null,
+                            Configuration.fromMap(
+                                    ImmutableMap.<String, String>builder()
+                                            .put("hostname", "localhost")
+                                            .put("port", "3306")
+                                            .put("username", "root")
+                                            .put("password", "123456")
+                                            .put("tables", "mydb.\\.*")
+                                            .put("server-id", "5400-5404")
+                                            .put("server-time-zone", "UTC")
+                                            .build())),
+                    new SinkDef(
+                            "doris",
+                            null,
+                            Configuration.fromMap(
+                                    ImmutableMap.<String, String>builder()
+                                            .put("fenodes", "127.0.0.1:8030")
+                                            .put("username", "root")
+                                            .put("password", "")
+                                            .build()),
+                            ImmutableSet.of(
+                                    DROP_COLUMN,
+                                    ALTER_COLUMN_TYPE,
+                                    ADD_COLUMN,
+                                    CREATE_TABLE,
+                                    RENAME_COLUMN)),
+                    Arrays.asList(
+                            new RouteDef(
+                                    "mydb.order_.*",
+                                    "ods_db.ods_orders",
+                                    null,
+                                    "Merge all order sharded tables"),
+                            new RouteDef(
+                                    "mydb.product_.*",
+                                    "ods_db.ods_products",
+                                    null,
+                                    "Merge all product sharded tables"),
+                            new RouteDef(
+                                    "mydb.*",
+                                    "ods_db.ods_<>",
+                                    "<>",
+                                    "One-to-one mapping for other tables")),
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    Configuration.fromMap(
+                            ImmutableMap.<String, String>builder()
+                                    .put("name", 
"mysql_to_doris_with_route_match_mode")
+                                    .put("parallelism", "2")
+                                    .put("route-mode", "FIRST_MATCH")
+                                    .build()));
 }
diff --git 
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml
 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml
new file mode 100644
index 000000000..94be52075
--- /dev/null
+++ 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Example pipeline definition with route match-mode
+source:
+  type: mysql
+  hostname: localhost
+  port: 3306
+  username: root
+  password: 123456
+  tables: mydb.\.*
+  server-id: 5400-5404
+  server-time-zone: UTC
+
+sink:
+  type: doris
+  fenodes: 127.0.0.1:8030
+  username: root
+  password: ""
+
+route:
+  - source-table: mydb.order_.*
+    sink-table: ods_db.ods_orders
+    description: "Merge all order sharded tables"
+  - source-table: mydb.product_.*
+    sink-table: ods_db.ods_products
+    description: "Merge all product sharded tables"
+  - source-table: mydb.*
+    sink-table: ods_db.ods_<>
+    replace-symbol: <>
+    description: "One-to-one mapping for other tables"
+
+pipeline:
+  name: mysql_to_doris_with_route_match_mode
+  parallelism: 2
+  route-mode: FIRST_MATCH
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
index f27c37f7a..443d6145b 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
@@ -72,6 +72,22 @@ public class PipelineOptions {
                                                             "EXCEPTION: Throw 
an exception to terminate the sync pipeline.")))
                                     .build());
 
+    public static final ConfigOption<RouteMode> PIPELINE_ROUTE_MODE =
+            ConfigOptions.key("route-mode")
+                    .enumType(RouteMode.class)
+                    .defaultValue(RouteMode.ALL_MATCH)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Match mode for routing rules. ")
+                                    .linebreak()
+                                    .add(
+                                            ListElement.list(
+                                                    text(
+                                                            "ALL_MATCH: Apply 
all matching route rules to a table."),
+                                                    text(
+                                                            "FIRST_MATCH: 
Apply only the first matching route rule and stop evaluation.")))
+                                    .build());
+
     public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
             ConfigOptions.key("local-time-zone")
                     .stringType()
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java
new file mode 100644
index 000000000..bd39b0006
--- /dev/null
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.pipeline;
+
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+
+/** Route mode for routing rules. */
+@PublicEvolving
+public enum RouteMode {
+    /** Match all applicable routing rules. */
+    ALL_MATCH,
+    /** Match only the first applicable routing rule. */
+    FIRST_MATCH;
+}
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
index 06126b88a..9c9d7d016 100755
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.common.route;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.cdc.common.annotation.PublicEvolving;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.schema.Selectors;
 
 import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
@@ -34,7 +35,10 @@ import javax.annotation.Nonnull;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -53,6 +57,7 @@ public class TableIdRouter {
 
     private final List<Tuple3<Pattern, String, String>> routes;
     private final LoadingCache<TableId, List<TableId>> routingCache;
+    private final RouteMode routeMode;
 
     private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
 
@@ -106,6 +111,11 @@ public class TableIdRouter {
     }
 
     public TableIdRouter(List<RouteRule> routingRules) {
+        this(routingRules, RouteMode.ALL_MATCH);
+    }
+
+    public TableIdRouter(List<RouteRule> routingRules, RouteMode routeMode) {
+        this.routeMode = routeMode;
         this.routes = new ArrayList<>();
         for (RouteRule rule : routingRules) {
             try {
@@ -139,11 +149,19 @@ public class TableIdRouter {
     }
 
     private List<TableId> calculateRoute(TableId sourceTableId) {
-        List<TableId> routedTableIds =
-                routes.stream()
-                        .filter(route -> matches(route.f0, sourceTableId))
-                        .map(route -> resolveReplacement(sourceTableId, route))
-                        .collect(Collectors.toList());
+        List<TableId> routedTableIds = new ArrayList<>();
+
+        for (Tuple3<Pattern, String, String> route : routes) {
+            if (matches(route.f0, sourceTableId)) {
+                routedTableIds.add(resolveReplacement(sourceTableId, route));
+
+                // If match mode is FIRST_MATCH, stop after the first match
+                if (routeMode == RouteMode.FIRST_MATCH) {
+                    break;
+                }
+            }
+        }
+
         if (routedTableIds.isEmpty()) {
             routedTableIds.add(sourceTableId);
         }
@@ -177,13 +195,36 @@ public class TableIdRouter {
         if (routes.isEmpty()) {
             return new ArrayList<>();
         }
-        return routes.stream()
-                .map(
-                        route ->
-                                tableIdSet.stream()
-                                        .filter(tableId -> matches(route.f0, 
tableId))
-                                        .collect(Collectors.toSet()))
-                .collect(Collectors.toList());
+
+        if (routeMode == RouteMode.ALL_MATCH) {
+            return routes.stream()
+                    .map(
+                            route ->
+                                    tableIdSet.stream()
+                                            .filter(tableId -> 
matches(route.f0, tableId))
+                                            .collect(Collectors.toSet()))
+                    .collect(Collectors.toList());
+        } else if (routeMode == RouteMode.FIRST_MATCH) {
+            Map<TableId, Integer> matchingTableIds = new HashMap<>();
+            for (TableId tableId : tableIdSet) {
+                for (int i = 0; i < routes.size(); i++) {
+                    if 
(routes.get(i).f0.matcher(tableId.toString()).matches()) {
+                        matchingTableIds.put(tableId, i);
+                        break;
+                    }
+                }
+            }
+            List<Set<TableId>> routeGroups = new ArrayList<>(routes.size());
+            for (int i = 0; i < routes.size(); i++) {
+                routeGroups.add(new HashSet<>());
+            }
+            for (Map.Entry<TableId, Integer> entry : 
matchingTableIds.entrySet()) {
+                routeGroups.get(entry.getValue()).add(entry.getKey());
+            }
+            return routeGroups;
+        } else {
+            throw new IllegalArgumentException("Unexpected route mode: " + 
routeMode);
+        }
     }
 
     private static boolean matches(Pattern pattern, TableId tableId) {
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
index 985613e13..de31732b2 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.composer.definition;
 
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode;
 import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
 import org.apache.flink.cdc.composer.PipelineComposer;
@@ -32,6 +33,7 @@ import java.util.TimeZone;
 
 import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE;
 import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_ROUTE_MODE;
 
 /**
  * Definition of a pipeline.
@@ -100,6 +102,10 @@ public class PipelineDef {
         return routes;
     }
 
+    public RouteMode getRouteMode() {
+        return config.get(PIPELINE_ROUTE_MODE);
+    }
+
     public List<TransformDef> getTransforms() {
         return transforms;
     }
@@ -125,6 +131,8 @@ public class PipelineDef {
                 + sink
                 + ", routes="
                 + routes
+                + ", routeMode="
+                + getRouteMode()
                 + ", transforms="
                 + transforms
                 + ", udfs="
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index 6de9045af..1854b0503 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -220,7 +220,8 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                                             pipelineDef
                                                     .getSink()
                                                     
.getIncludedSchemaEvolutionTypes()),
-                            pipelineDef.getRoute());
+                            pipelineDef.getRoute(),
+                            pipelineDef.getRouteMode());
 
         } else {
             // Translate a regular topology for sources without distributed 
tables
@@ -235,7 +236,8 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                                             pipelineDef
                                                     .getSink()
                                                     
.getIncludedSchemaEvolutionTypes()),
-                            pipelineDef.getRoute());
+                            pipelineDef.getRoute(),
+                            pipelineDef.getRouteMode());
 
             // Schema Operator ---(shuffled)---> Partitioning
             stream =
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
index 8f31803b3..7340b3e80 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
@@ -18,7 +18,9 @@
 package org.apache.flink.cdc.composer.flink.translator;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -55,29 +57,44 @@ public class SchemaOperatorTranslator {
         this.timezone = timezone;
     }
 
+    @VisibleForTesting
     public DataStream<Event> translateRegular(
             DataStream<Event> input,
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes) {
-        return translateRegular(input, parallelism, false, metadataApplier, 
routes);
+        return translateRegular(
+                input, parallelism, false, metadataApplier, routes, 
RouteMode.ALL_MATCH);
     }
 
+    @VisibleForTesting
     public DataStream<Event> translateRegular(
             DataStream<Event> input,
             int parallelism,
             boolean isBatchMode,
             MetadataApplier metadataApplier,
             List<RouteDef> routes) {
+        return translateRegular(
+                input, parallelism, isBatchMode, metadataApplier, routes, 
RouteMode.ALL_MATCH);
+    }
+
+    public DataStream<Event> translateRegular(
+            DataStream<Event> input,
+            int parallelism,
+            boolean isBatchMode,
+            MetadataApplier metadataApplier,
+            List<RouteDef> routes,
+            RouteMode routeMode) {
 
         return isBatchMode
                 ? addRegularSchemaBatchOperator(
-                        input, parallelism, metadataApplier, routes, timezone)
+                        input, parallelism, metadataApplier, routes, 
routeMode, timezone)
                 : addRegularSchemaOperator(
                         input,
                         parallelism,
                         metadataApplier,
                         routes,
+                        routeMode,
                         schemaChangeBehavior,
                         timezone);
     }
@@ -86,9 +103,16 @@ public class SchemaOperatorTranslator {
             DataStream<PartitioningEvent> input,
             int parallelism,
             MetadataApplier metadataApplier,
-            List<RouteDef> routes) {
+            List<RouteDef> routes,
+            RouteMode routeMode) {
         return addDistributedSchemaOperator(
-                input, parallelism, metadataApplier, routes, 
schemaChangeBehavior, timezone);
+                input,
+                parallelism,
+                metadataApplier,
+                routes,
+                routeMode,
+                schemaChangeBehavior,
+                timezone);
     }
 
     @Deprecated
@@ -101,6 +125,7 @@ public class SchemaOperatorTranslator {
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
         List<RouteRule> routingRules = new ArrayList<>();
@@ -118,6 +143,7 @@ public class SchemaOperatorTranslator {
                         new SchemaOperatorFactory(
                                 metadataApplier,
                                 routingRules,
+                                routeMode,
                                 rpcTimeOut,
                                 schemaChangeBehavior,
                                 timezone));
@@ -130,6 +156,7 @@ public class SchemaOperatorTranslator {
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes,
+            RouteMode routeMode,
             String timezone) {
         List<RouteRule> routingRules = new ArrayList<>();
         for (RouteDef route : routes) {
@@ -143,7 +170,8 @@ public class SchemaOperatorTranslator {
                 input.transform(
                         "SchemaBatchOperator",
                         new EventTypeInfo(),
-                        new BatchSchemaOperator(routingRules, metadataApplier, 
timezone));
+                        new BatchSchemaOperator(
+                                routingRules, routeMode, metadataApplier, 
timezone));
         stream.uid(schemaOperatorUid).setParallelism(parallelism);
         return stream;
     }
@@ -153,6 +181,7 @@ public class SchemaOperatorTranslator {
             int parallelism,
             MetadataApplier metadataApplier,
             List<RouteDef> routes,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
         Preconditions.checkArgument(
@@ -181,6 +210,7 @@ public class SchemaOperatorTranslator {
                                 .SchemaOperatorFactory(
                                 metadataApplier,
                                 routingRules,
+                                routeMode,
                                 rpcTimeOut,
                                 schemaChangeBehavior,
                                 timezone))
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index d01d987d4..7a01193c3 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -1588,6 +1589,146 @@ class FlinkPipelineComposerITCase {
         return events;
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testRouteModeFirstMatch(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        TableId routedTable1 = TableId.tableId("default_namespace", 
"default_schema", "routed1");
+        TableId routedTable2 = TableId.tableId("default_namespace", 
"default_schema", "routed2");
+        TableId routedAll = TableId.tableId("default_namespace", 
"default_schema", "routed_all");
+        List<RouteDef> routeDef =
+                Arrays.asList(
+                        new RouteDef(TABLE_1.toString(), 
routedTable1.toString(), null, null),
+                        new RouteDef(TABLE_2.toString(), 
routedTable2.toString(), null, null),
+                        new RouteDef(
+                                "default_namespace.default_schema.table\\.*",
+                                routedAll.toString(),
+                                null,
+                                null));
+
+        // Setup pipeline with first-match route mode
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        pipelineConfig.set(PipelineOptions.PIPELINE_ROUTE_MODE, 
RouteMode.FIRST_MATCH);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        routeDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check result in ValuesDatabase
+        // With first-match mode, tables should only be routed to their first 
matching route
+        List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
+        assertThat(routed1Results)
+                .contains(
+                        
"default_namespace.default_schema.routed1:col1=2;newCol3=x",
+                        
"default_namespace.default_schema.routed1:col1=3;newCol3=");
+        List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
+        assertThat(routed2Results)
+                .contains(
+                        
"default_namespace.default_schema.routed2:col1=1;col2=1",
+                        
"default_namespace.default_schema.routed2:col1=2;col2=2",
+                        
"default_namespace.default_schema.routed2:col1=3;col2=3");
+
+        List<String> allResults = ValuesDatabase.getAllResults();
+        assertThat(allResults).noneMatch(result -> 
result.startsWith(routedAll.toString()));
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void testRouteModeAllMatch(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        TableId routedTable1 = TableId.tableId("default_namespace", 
"default_schema", "routed1");
+        TableId routedTable2 = TableId.tableId("default_namespace", 
"default_schema", "routed2");
+        TableId routedAll = TableId.tableId("default_namespace", 
"default_schema", "routed_all");
+        List<RouteDef> routeDef =
+                Arrays.asList(
+                        new RouteDef(TABLE_1.toString(), 
routedTable1.toString(), null, null),
+                        new RouteDef(TABLE_2.toString(), 
routedTable2.toString(), null, null),
+                        new RouteDef(
+                                "default_namespace.default_schema.table\\.*",
+                                routedAll.toString(),
+                                null,
+                                null));
+
+        // Setup pipeline with all-match route mode
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        pipelineConfig.set(PipelineOptions.PIPELINE_ROUTE_MODE, 
RouteMode.ALL_MATCH);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        routeDef,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check result in ValuesDatabase
+        // With all-match mode, tables should be routed to all matching routes
+        List<String> routed1Results = ValuesDatabase.getResults(routedTable1);
+        assertThat(routed1Results)
+                .contains(
+                        
"default_namespace.default_schema.routed1:col1=2;newCol3=x",
+                        
"default_namespace.default_schema.routed1:col1=3;newCol3=");
+        List<String> routed2Results = ValuesDatabase.getResults(routedTable2);
+        assertThat(routed2Results)
+                .contains(
+                        
"default_namespace.default_schema.routed2:col1=1;col2=1",
+                        
"default_namespace.default_schema.routed2:col1=2;col2=2",
+                        
"default_namespace.default_schema.routed2:col1=3;col2=3");
+
+        List<String> routedAllResults = ValuesDatabase.getResults(routedAll);
+        assertThat(routedAllResults).isNotEmpty();
+        assertThat(routedAllResults.stream().filter(s -> 
s.contains("routed_all")).count())
+                .isGreaterThan(0);
+    }
+
     BinaryRecordData generate(Schema schema, Object... fields) {
         return (new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
                 .generate(
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
index c9169388d..870ddfa72 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java
@@ -1183,4 +1183,145 @@ class RouteE2eITCase extends PipelineTestEnvironment {
                         .toArray(String[]::new));
         extremeRouteTestDatabase.dropDatabase();
     }
+
+    @ParameterizedTest(name = "isFirstMatch: {0}")
+    @ValueSource(booleans = {true, false})
+    void testMultipleRouteWithRouteMode(boolean isFirstMatch) throws Exception 
{
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "route:\n"
+                                + "  - source-table: 
%s.(TABLEALPHA|TABLEBETA)\n"
+                                + "    sink-table: NEW_%s.ALPHABET\n"
+                                + "  - source-table: 
%s.(TABLEBETA|TABLEGAMMA)\n"
+                                + "    sink-table: NEW_%s.BETAGAMM\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  route-mode: %s\n"
+                                + "  parallelism: %d",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        routeTestDatabase.getDatabaseName(),
+                        routeTestDatabase.getDatabaseName(),
+                        routeTestDatabase.getDatabaseName(),
+                        routeTestDatabase.getDatabaseName(),
+                        routeTestDatabase.getDatabaseName(),
+                        isFirstMatch ? "FIRST_MATCH" : "ALL_MATCH",
+                        parallelism);
+        submitPipelineJob(pipelineJob);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        if (isFirstMatch) {
+            validateResult(
+                    routeDbNameFormatter,
+                    "CreateTableEvent{tableId=NEW_%s.ALPHABET, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                    "CreateTableEvent{tableId=NEW_%s.BETAGAMM, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                    "CreateTableEvent{tableId=%s.TABLEDELTA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1008, 8], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1009, 8.1], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1010, 10], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1011, 11], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2011, 11], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2012, 12], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2013, 13], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2014, 14], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3015, Amber], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3016, Black], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3017, Cyan], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3018, Denim], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4019, Yosemite], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4020, El Capitan], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4021, Sierra], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4022, High Sierra], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4023, Mojave], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4024, Catalina], op=INSERT, meta=()}");
+
+        } else {
+            validateResult(
+                    routeDbNameFormatter,
+                    "CreateTableEvent{tableId=NEW_%s.ALPHABET, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                    "CreateTableEvent{tableId=NEW_%s.BETAGAMM, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                    "CreateTableEvent{tableId=%s.TABLEDELTA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, 
options=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1008, 8], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1009, 8.1], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1010, 10], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[1011, 11], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2011, 11], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2012, 12], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2013, 13], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[2014, 14], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[2011, 11], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[2012, 12], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[2013, 13], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[2014, 14], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3015, Amber], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3016, Black], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3017, Cyan], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3018, Denim], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4019, Yosemite], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4020, El Capitan], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4021, Sierra], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4022, High Sierra], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4023, Mojave], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[4024, Catalina], op=INSERT, meta=()}");
+        }
+
+        generateIncrementalChanges();
+        if (isFirstMatch) {
+            validateResult(
+                    routeDbNameFormatter,
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[3007, 7], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[2014, 
14], after=[2014, 2014], op=UPDATE, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3019, Emerald], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[4024, 
Catalina], after=[], op=DELETE, meta=()}");
+
+        } else {
+            validateResult(
+                    routeDbNameFormatter,
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[3007, 7], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[2014, 
14], after=[2014, 2014], op=UPDATE, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[2014, 
14], after=[2014, 2014], op=UPDATE, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[3019, Emerald], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[4024, 
Catalina], after=[], op=DELETE, meta=()}");
+        }
+
+        generateSchemaChanges();
+
+        if (isFirstMatch) {
+            validateResult(
+                    routeDbNameFormatter,
+                    "AddColumnEvent{tableId=NEW_%s.ALPHABET, 
addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=AFTER, 
existedColumnName=VERSION}]}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[10001, 12, Derrida], op=INSERT, meta=()}",
+                    "AddColumnEvent{tableId=NEW_%s.ALPHABET, 
addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), 
position=AFTER, existedColumnName=NAME}]}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[10002, null, null, 15], op=INSERT, meta=()}",
+                    "RenameColumnEvent{tableId=NEW_%s.BETAGAMM, 
nameMapping={VERSION=VERSION_EX}}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[10003, Fluorite], op=INSERT, meta=()}",
+                    "DropColumnEvent{tableId=%s.TABLEDELTA, 
droppedColumnNames=[VERSION]}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[10004], op=INSERT, meta=()}");
+        } else {
+            validateResult(
+                    routeDbNameFormatter,
+                    "AddColumnEvent{tableId=NEW_%s.ALPHABET, 
addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=AFTER, 
existedColumnName=VERSION}]}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[10001, 12, Derrida], op=INSERT, meta=()}",
+                    "AddColumnEvent{tableId=NEW_%s.ALPHABET, 
addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), 
position=AFTER, existedColumnName=NAME}]}",
+                    "AddColumnEvent{tableId=NEW_%s.BETAGAMM, 
addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), 
position=AFTER, existedColumnName=VERSION}]}",
+                    "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], 
after=[10002, null, null, 15], op=INSERT, meta=()}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[10002, null, 15], op=INSERT, meta=()}",
+                    "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, 
typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}",
+                    "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], 
after=[10003, null, Fluorite], op=INSERT, meta=()}",
+                    "DropColumnEvent{tableId=%s.TABLEDELTA, 
droppedColumnNames=[VERSION]}",
+                    "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], 
after=[10004], op=INSERT, meta=()}");
+        }
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
index 640fb765c..af0cd6f4c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.operators.schema.common;
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -87,6 +88,7 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
     protected final MetadataApplier metadataApplier;
     protected final Duration rpcTimeout;
     protected final List<RouteRule> routingRules;
+    protected final RouteMode routeMode;
     protected final SchemaChangeBehavior behavior;
 
     // -------------------------
@@ -104,6 +106,7 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
             ExecutorService coordinatorExecutor,
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             Duration rpcTimeout) {
         this.context = context;
@@ -111,6 +114,7 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
         this.coordinatorExecutor = coordinatorExecutor;
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.rpcTimeout = rpcTimeout;
         this.behavior = schemaChangeBehavior;
     }
@@ -127,7 +131,7 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
         if (this.schemaManager == null) {
             this.schemaManager = new SchemaManager();
         }
-        this.router = new TableIdRouter(routingRules);
+        this.router = new TableIdRouter(routingRules, routeMode);
     }
 
     @Override
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
index 8626eafbc..f9aad19b8 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cdc.common.annotation.VisibleForTesting;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -109,6 +110,7 @@ public class SchemaCoordinator extends SchemaRegistry {
             ExecutorService coordinatorExecutor,
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             Duration rpcTimeout) {
         super(
@@ -117,6 +119,7 @@ public class SchemaCoordinator extends SchemaRegistry {
                 coordinatorExecutor,
                 metadataApplier,
                 routingRules,
+                routeMode,
                 schemaChangeBehavior,
                 rpcTimeout);
         this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
index f028f4cc9..a5cf25a7f 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.runtime.operators.schema.distributed;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -39,6 +40,7 @@ public class SchemaCoordinatorProvider implements 
OperatorCoordinator.Provider {
     private final String operatorName;
     private final MetadataApplier metadataApplier;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final Duration rpcTimeout;
 
@@ -47,12 +49,14 @@ public class SchemaCoordinatorProvider implements 
OperatorCoordinator.Provider {
             String operatorName,
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             Duration rpcTimeout) {
         this.operatorID = operatorID;
         this.operatorName = operatorName;
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.rpcTimeout = rpcTimeout;
     }
@@ -75,6 +79,7 @@ public class SchemaCoordinatorProvider implements 
OperatorCoordinator.Provider {
                 coordinatorExecutor,
                 metadataApplier,
                 routingRules,
+                routeMode,
                 schemaChangeBehavior,
                 rpcTimeout);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 9ec2b2fc9..7964efd3f 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -71,13 +72,16 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
     private final String timezone;
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
 
     public SchemaOperator(
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             Duration rpcTimeOut,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.rpcTimeOut = rpcTimeOut;
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.timezone = timezone;
@@ -100,7 +104,7 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
         subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
         upstreamSchemaTable = HashBasedTable.create();
         evolvedSchemaMap = new HashMap<>();
-        tableIdRouter = new TableIdRouter(routingRules);
+        tableIdRouter = new TableIdRouter(routingRules, routeMode);
         derivator = new SchemaDerivator();
         this.schemaOperatorMetrics =
                 new SchemaOperatorMetrics(
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
index 52378c15f..5652f28ad 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.cdc.runtime.operators.schema.distributed;
 
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -41,18 +42,23 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
 
     private final MetadataApplier metadataApplier;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final Duration rpcTimeout;
 
     public SchemaOperatorFactory(
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             Duration rpcTimeout,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
-        super(new SchemaOperator(routingRules, rpcTimeout, 
schemaChangeBehavior, timezone));
+        super(
+                new SchemaOperator(
+                        routingRules, routeMode, rpcTimeout, 
schemaChangeBehavior, timezone));
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.rpcTimeout = rpcTimeout;
     }
@@ -65,6 +71,7 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
                 operatorName,
                 metadataApplier,
                 routingRules,
+                routeMode,
                 schemaChangeBehavior,
                 rpcTimeout);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
index 8ed7d0916..e830b1f2a 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -58,6 +59,7 @@ public class BatchSchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
     // Final fields that are set in constructor
     private final String timezone;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
 
     // Transient fields that are set during open()
     private transient volatile Map<TableId, Schema> originalSchemaMap;
@@ -69,10 +71,14 @@ public class BatchSchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
     private boolean alreadyMergedCreateTableTables = false;
 
     public BatchSchemaOperator(
-            List<RouteRule> routingRules, MetadataApplier metadataApplier, 
String timezone) {
+            List<RouteRule> routingRules,
+            RouteMode routeMode,
+            MetadataApplier metadataApplier,
+            String timezone) {
         this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.timezone = timezone;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.metadataApplier = metadataApplier;
     }
 
@@ -89,7 +95,7 @@ public class BatchSchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
         super.open();
         this.originalSchemaMap = new HashMap<>();
         this.evolvedSchemaMap = new HashMap<>();
-        this.router = new TableIdRouter(routingRules);
+        this.router = new TableIdRouter(routingRules, routeMode);
         this.derivator = new SchemaDerivator();
         this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
index 53344dc87..3a241aac4 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
@@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -100,6 +101,7 @@ public class SchemaCoordinator extends SchemaRegistry {
             ExecutorService coordinatorExecutor,
             MetadataApplier metadataApplier,
             List<RouteRule> routes,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             Duration rpcTimeout) {
         super(
@@ -108,6 +110,7 @@ public class SchemaCoordinator extends SchemaRegistry {
                 coordinatorExecutor,
                 metadataApplier,
                 routes,
+                routeMode,
                 schemaChangeBehavior,
                 rpcTimeout);
         this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
index 253b52cac..cedc9d37c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.runtime.operators.schema.regular;
 
 import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -39,6 +40,7 @@ public class SchemaCoordinatorProvider implements 
OperatorCoordinator.Provider {
     private final String operatorName;
     private final MetadataApplier metadataApplier;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final Duration rpcTimeout;
 
@@ -47,12 +49,14 @@ public class SchemaCoordinatorProvider implements 
OperatorCoordinator.Provider {
             String operatorName,
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             SchemaChangeBehavior schemaChangeBehavior,
             Duration rpcTimeout) {
         this.operatorID = operatorID;
         this.operatorName = operatorName;
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.rpcTimeout = rpcTimeout;
     }
@@ -75,6 +79,7 @@ public class SchemaCoordinatorProvider implements 
OperatorCoordinator.Provider {
                 coordinatorExecutor,
                 metadataApplier,
                 routingRules,
+                routeMode,
                 schemaChangeBehavior,
                 rpcTimeout);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
index 7dd9a210a..b659b7234 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -76,6 +77,7 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
     private final Duration rpcTimeout;
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
 
     // Transient fields that are set during open()
     private transient int subTaskId;
@@ -88,24 +90,26 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
 
     @VisibleForTesting
     public SchemaOperator(List<RouteRule> routingRules) {
-        this(routingRules, DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+        this(routingRules, RouteMode.ALL_MATCH, 
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
     }
 
     @VisibleForTesting
-    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
-        this(routingRules, rpcTimeOut, SchemaChangeBehavior.EVOLVE);
+    public SchemaOperator(List<RouteRule> routingRules, RouteMode routeMode, 
Duration rpcTimeOut) {
+        this(routingRules, routeMode, rpcTimeOut, SchemaChangeBehavior.EVOLVE);
     }
 
     @VisibleForTesting
     public SchemaOperator(
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             Duration rpcTimeOut,
             SchemaChangeBehavior schemaChangeBehavior) {
-        this(routingRules, rpcTimeOut, schemaChangeBehavior, "UTC");
+        this(routingRules, routeMode, rpcTimeOut, schemaChangeBehavior, "UTC");
     }
 
     public SchemaOperator(
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             Duration rpcTimeOut,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
@@ -114,6 +118,7 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.timezone = timezone;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
     }
 
     @Override
@@ -134,7 +139,7 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
         this.subTaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
         this.originalSchemaMap = new HashMap<>();
         this.evolvedSchemaMap = new HashMap<>();
-        this.router = new TableIdRouter(routingRules);
+        this.router = new TableIdRouter(routingRules, routeMode);
         this.derivator = new SchemaDerivator();
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
index 630acc904..6afc3a138 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.operators.schema.regular;
 
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
@@ -40,18 +41,23 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
 
     private final MetadataApplier metadataApplier;
     private final List<RouteRule> routingRules;
+    private final RouteMode routeMode;
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final Duration rpcTimeout;
 
     public SchemaOperatorFactory(
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
+            RouteMode routeMode,
             Duration rpcTimeout,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
-        super(new SchemaOperator(routingRules, rpcTimeout, 
schemaChangeBehavior, timezone));
+        super(
+                new SchemaOperator(
+                        routingRules, routeMode, rpcTimeout, 
schemaChangeBehavior, timezone));
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
+        this.routeMode = routeMode;
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.rpcTimeout = rpcTimeout;
     }
@@ -64,6 +70,7 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
                 operatorName,
                 metadataApplier,
                 routingRules,
+                routeMode,
                 schemaChangeBehavior,
                 rpcTimeout);
     }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
index b8e487700..abfc475df 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.common.route;
 
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase;
 
 import org.junit.jupiter.api.Test;
@@ -166,7 +167,9 @@ public class TableIdRouterTest extends SchemaTestBase {
     private static List<String> testStdRegExpRoute(
             String sourceRouteRule, String sinkRouteRule, List<String> 
sourceTables) {
         TableIdRouter router =
-                new TableIdRouter(List.of(new RouteRule(sourceRouteRule, 
sinkRouteRule)));
+                new TableIdRouter(
+                        List.of(new RouteRule(sourceRouteRule, sinkRouteRule)),
+                        RouteMode.ALL_MATCH);
         return sourceTables.stream()
                 .map(TableId::parse)
                 .map(router::route)
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
index 40b40b468..55d612b86 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.route.TableIdRouter;
@@ -615,7 +616,8 @@ public class SchemaDerivatorTest extends SchemaTestBase {
                                 new RouteRule("db_3.table_\\.*", 
"db_3.table_merged"),
                                 // Broadcast tables
                                 new RouteRule("db_4.table_1", "db_4.table_a"),
-                                new RouteRule("db_4.table_1", 
"db_4.table_b")));
+                                new RouteRule("db_4.table_1", "db_4.table_b")),
+                        RouteMode.ALL_MATCH);
         List<CreateTableEvent> createTableEvents =
                 Arrays.asList(
                         new CreateTableEvent(
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
index af273d13d..42c78b030 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.route.TableIdRouter;
 import org.apache.flink.cdc.common.types.DataType;
@@ -72,7 +73,8 @@ public abstract class SchemaTestBase {
                             null),
                     new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null));
 
-    protected static final TableIdRouter TABLE_ID_ROUTER = new 
TableIdRouter(ROUTING_RULES);
+    protected static final TableIdRouter TABLE_ID_ROUTER =
+            new TableIdRouter(ROUTING_RULES, RouteMode.ALL_MATCH);
 
     protected static BinaryRecordData genBinRec(String rowType, Object... 
fields) {
         return (new BinaryRecordDataGenerator(quickGenRow(rowType).toArray(new 
DataType[0])))
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java
new file mode 100644
index 000000000..14555947e
--- /dev/null
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.common;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.route.TableIdRouter;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+/** Unit test for {@link TableIdRouter} with match-mode support. */
+public class TableIdRouterMatchModeTest {
+
+    @Test
+    void testFirstMatchMode() {
+        // Setup routing rules for first-match mode
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        // Sharded tables should be merged
+                        new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+                        new RouteRule("mydb.product_\\.*", 
"ods_db.ods_products"),
+                        // Catch-all rule for one-to-one mapping
+                        new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.FIRST_MATCH);
+
+        // Test sharded order tables - should match first rule and stop
+        assertThat(route(router, 
"mydb.order_1")).containsExactly("ods_db.ods_orders");
+        assertThat(route(router, 
"mydb.order_2")).containsExactly("ods_db.ods_orders");
+        assertThat(route(router, 
"mydb.order_100")).containsExactly("ods_db.ods_orders");
+
+        // Test sharded product tables - should match second rule and stop
+        assertThat(route(router, 
"mydb.product_1")).containsExactly("ods_db.ods_products");
+        assertThat(route(router, 
"mydb.product_2")).containsExactly("ods_db.ods_products");
+
+        // Test non-sharded tables - should match third rule (catch-all)
+        assertThat(route(router, 
"mydb.user")).containsExactly("ods_db.ods_user");
+        assertThat(route(router, 
"mydb.customer")).containsExactly("ods_db.ods_customer");
+        assertThat(route(router, 
"mydb.config")).containsExactly("ods_db.ods_config");
+    }
+
+    @Test
+    void testAllMatchMode() {
+        // Setup routing rules for all-match mode (default behavior)
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+                        new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.ALL_MATCH);
+
+        // Test sharded order tables - should match BOTH rules
+        assertThat(route(router, "mydb.order_1"))
+                .containsExactlyInAnyOrder("ods_db.ods_orders", 
"ods_db.ods_order_1");
+        assertThat(route(router, "mydb.order_2"))
+                .containsExactlyInAnyOrder("ods_db.ods_orders", 
"ods_db.ods_order_2");
+
+        // Test non-sharded tables - should match only second rule
+        assertThat(route(router, 
"mydb.user")).containsExactly("ods_db.ods_user");
+    }
+
+    @Test
+    void testFirstMatchWithNoMatchingRules() {
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+                        new RouteRule("mydb.product_\\.*", 
"ods_db.ods_products"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.FIRST_MATCH);
+
+        // Table that doesn't match any rule should route to itself (implicit 
routing)
+        assertThat(route(router, 
"otherdb.user")).containsExactly("otherdb.user");
+    }
+
+    @Test
+    void testAllMatchWithMultipleMatchingRules() {
+        // Setup multiple overlapping rules
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        new RouteRule("db.table_\\.*", "db.merged_1"),
+                        new RouteRule("db.table_\\.*", "db.merged_2"),
+                        new RouteRule("db.table_\\.*", "db.merged_3"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.ALL_MATCH);
+
+        // Should match all three rules
+        assertThat(route(router, "db.table_1"))
+                .containsExactlyInAnyOrder("db.merged_1", "db.merged_2", 
"db.merged_3");
+    }
+
+    private static List<String> route(TableIdRouter router, String tableId) {
+        return router.route(TableId.parse(tableId)).stream()
+                .map(TableId::toString)
+                .collect(Collectors.toList());
+    }
+
+    @Test
+    void testGroupSourceTablesByRouteRuleFirstMatch() {
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+                        new RouteRule("mydb.product_\\.*", 
"ods_db.ods_products"),
+                        new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.FIRST_MATCH);
+
+        Set<TableId> tableIdSet =
+                new HashSet<>(
+                        Arrays.asList(
+                                TableId.parse("mydb.order_1"),
+                                TableId.parse("mydb.order_2"),
+                                TableId.parse("mydb.order_100"),
+                                TableId.parse("mydb.product_1"),
+                                TableId.parse("mydb.product_2"),
+                                TableId.parse("mydb.user"),
+                                TableId.parse("mydb.customer"),
+                                TableId.parse("mydb.config")));
+
+        List<Set<TableId>> groups = 
router.groupSourceTablesByRouteRule(tableIdSet);
+
+        assertThat(groups).hasSize(3);
+
+        assertThat(groups.get(0))
+                .containsExactlyInAnyOrder(
+                        TableId.parse("mydb.order_1"),
+                        TableId.parse("mydb.order_2"),
+                        TableId.parse("mydb.order_100"));
+
+        assertThat(groups.get(1))
+                .containsExactlyInAnyOrder(
+                        TableId.parse("mydb.product_1"), 
TableId.parse("mydb.product_2"));
+
+        assertThat(groups.get(2))
+                .containsExactlyInAnyOrder(
+                        TableId.parse("mydb.user"),
+                        TableId.parse("mydb.customer"),
+                        TableId.parse("mydb.config"));
+    }
+
+    @Test
+    void testGroupSourceTablesByRouteRuleFirstMatchWithUnmatchedTables() {
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"),
+                        new RouteRule("mydb.product_\\.*", 
"ods_db.ods_products"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.FIRST_MATCH);
+
+        Set<TableId> tableIdSet =
+                new HashSet<>(
+                        Arrays.asList(
+                                TableId.parse("mydb.order_1"),
+                                TableId.parse("mydb.order_2"),
+                                TableId.parse("mydb.product_1"),
+                                TableId.parse("otherdb.user"),
+                                TableId.parse("otherdb.customer")));
+
+        List<Set<TableId>> groups = 
router.groupSourceTablesByRouteRule(tableIdSet);
+
+        assertThat(groups).hasSize(2);
+
+        // First group: order tables
+        assertThat(groups.get(0))
+                .containsExactlyInAnyOrder(
+                        TableId.parse("mydb.order_1"), 
TableId.parse("mydb.order_2"));
+
+        // Second group: product tables
+        
assertThat(groups.get(1)).containsExactlyInAnyOrder(TableId.parse("mydb.product_1"));
+
+        // Unmatched tables (otherdb.user, otherdb.customer) are not in any 
group
+        
assertThat(groups.stream().flatMap(Set::stream).collect(Collectors.toList()))
+                .doesNotContain(TableId.parse("otherdb.user"), 
TableId.parse("otherdb.customer"));
+    }
+
+    @Test
+    void testGroupSourceTablesByRouteRuleFirstMatchWithOverlappingRules() {
+        List<RouteRule> routingRules =
+                Arrays.asList(
+                        new RouteRule("db.table_\\.*", "db.merged_1"),
+                        new RouteRule("db.table_\\.*", "db.merged_2"),
+                        new RouteRule("db.table_\\.*", "db.merged_3"));
+
+        TableIdRouter router = new TableIdRouter(routingRules, 
RouteMode.FIRST_MATCH);
+
+        Set<TableId> tableIdSet =
+                new HashSet<>(
+                        Arrays.asList(
+                                TableId.parse("db.table_1"),
+                                TableId.parse("db.table_2"),
+                                TableId.parse("db.table_3")));
+
+        List<Set<TableId>> groups = 
router.groupSourceTablesByRouteRule(tableIdSet);
+
+        assertThat(groups).hasSize(3);
+
+        // All tables should match only the first rule (FIRST_MATCH mode)
+        assertThat(groups.get(0))
+                .containsExactlyInAnyOrder(
+                        TableId.parse("db.table_1"),
+                        TableId.parse("db.table_2"),
+                        TableId.parse("db.table_3"));
+
+        // Second and third groups should be empty
+        assertThat(groups.get(1)).isEmpty();
+        assertThat(groups.get(2)).isEmpty();
+    }
+}
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
index 7ece5d20c..4245887f2 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -107,6 +108,7 @@ public class SchemaEvolveTest extends SchemaTestBase {
                                 () ->
                                         new SchemaOperator(
                                                 ROUTING_RULES,
+                                                RouteMode.ALL_MATCH,
                                                 Duration.ofMinutes(3),
                                                 SchemaChangeBehavior.LENIENT,
                                                 "UTC"),
@@ -247,6 +249,7 @@ public class SchemaEvolveTest extends SchemaTestBase {
                                 () ->
                                         new SchemaOperator(
                                                 ROUTING_RULES,
+                                                RouteMode.ALL_MATCH,
                                                 Duration.ofMinutes(3),
                                                 SchemaChangeBehavior.IGNORE,
                                                 "UTC"),
@@ -349,6 +352,7 @@ public class SchemaEvolveTest extends SchemaTestBase {
                                         () ->
                                                 new SchemaOperator(
                                                         ROUTING_RULES,
+                                                        RouteMode.ALL_MATCH,
                                                         Duration.ofMinutes(3),
                                                         
SchemaChangeBehavior.EXCEPTION,
                                                         "UTC"),
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
index 740c77bd4..7bdf37ed4 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
@@ -87,7 +88,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDurationAndBehavior(
                         schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -365,7 +367,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDurationAndBehavior(
                         schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -643,7 +646,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDurationAndBehavior(
                         schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -737,7 +741,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDurationAndBehavior(
                         schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -1033,7 +1038,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 
RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError(
                         schemaOperator,
@@ -1131,7 +1137,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
 
         // All types of schema change events will be sent to the sink
         // AddColumn and RenameColumn events will always fail
@@ -1461,7 +1468,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
 
         // All types of schema change events will be sent to the sink
         // AddColumn and RenameColumn events will always fail
@@ -1795,7 +1803,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDurationAndBehavior(
                         schemaOperator, 17, Duration.ofSeconds(3), behavior);
@@ -2196,7 +2205,8 @@ class SchemaEvolveTest {
         SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT;
 
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
+                new SchemaOperator(
+                        new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30), behavior);
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDurationAndBehavior(
                         schemaOperator, 17, Duration.ofSeconds(3), behavior);
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
index bd7ed4da0..b547a820b 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
@@ -122,7 +123,7 @@ class SchemaOperatorTest {
     @Test
     void testProcessSchemaChangeEventWithTimeOut() throws Exception {
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1));
+                new SchemaOperator(new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(1));
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDuration(
                         schemaOperator, 1, Duration.ofSeconds(3));
@@ -141,7 +142,7 @@ class SchemaOperatorTest {
     @Test
     void testProcessSchemaChangeEventWithOutTimeOut() throws Exception {
         SchemaOperator schemaOperator =
-                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30));
+                new SchemaOperator(new ArrayList<>(), RouteMode.ALL_MATCH, 
Duration.ofSeconds(30));
         RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
                 RegularEventOperatorTestHarness.withDuration(
                         schemaOperator, 1, Duration.ofSeconds(3));
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
index cbc1f407d..3e81ec82b 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.testutils.operators;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
@@ -99,6 +100,7 @@ public class DistributedEventOperatorTestHarness<
                         Executors.newFixedThreadPool(1),
                         new CollectingMetadataApplier(applyDuration),
                         new ArrayList<>(),
+                        RouteMode.ALL_MATCH,
                         SchemaChangeBehavior.LENIENT,
                         rpcTimeout);
         this.schemaRegistryGateway = new 
TestingSchemaRegistryGateway(schemaCoordinator);
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
index 8a3a21ba6..9b2eb2ee4 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.pipeline.RouteMode;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
@@ -124,6 +125,7 @@ public class RegularEventOperatorTestHarness<
                         new CollectingMetadataApplier(
                                 schemaEvolveDuration, enabledEventTypes, 
errorsOnEventTypes),
                         new ArrayList<>(),
+                        RouteMode.ALL_MATCH,
                         behavior,
                         rpcTimeout);
         schemaRegistryGateway = new 
TestingSchemaRegistryGateway(schemaRegistry);

Reply via email to