This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 612ff5d35 [FLINK-37713][runtime] Fix SchemaManager was recreated in SchemaRegistry after job failover 612ff5d35 is described below commit 612ff5d352b2171b9e218f3e0d0c0b31921a3429 Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Thu Apr 24 14:22:21 2025 +0800 [FLINK-37713][runtime] Fix SchemaManager was recreated in SchemaRegistry after job failover This closes #4001 --- .../flink/cdc/runtime/operators/schema/common/SchemaRegistry.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 1e681a79f..8bea59548 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -123,7 +123,9 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio this.currentParallelism = context.currentParallelism(); this.activeSinkWriters = ConcurrentHashMap.newKeySet(); this.failedReasons = new ConcurrentHashMap<>(); - this.schemaManager = new SchemaManager(); + if (this.schemaManager == null) { + this.schemaManager = new SchemaManager(); + } this.router = new TableIdRouter(routingRules); }