This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 612ff5d35 [FLINK-37713][runtime] Fix SchemaManager was recreated in 
SchemaRegistry after job failover
612ff5d35 is described below

commit 612ff5d352b2171b9e218f3e0d0c0b31921a3429
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Apr 24 14:22:21 2025 +0800

    [FLINK-37713][runtime] Fix SchemaManager was recreated in SchemaRegistry 
after job failover
    
    This closes #4001
---
 .../flink/cdc/runtime/operators/schema/common/SchemaRegistry.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
index 1e681a79f..8bea59548 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
@@ -123,7 +123,9 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
         this.currentParallelism = context.currentParallelism();
         this.activeSinkWriters = ConcurrentHashMap.newKeySet();
         this.failedReasons = new ConcurrentHashMap<>();
-        this.schemaManager = new SchemaManager();
+        if (this.schemaManager == null) {
+            this.schemaManager = new SchemaManager();
+        }
         this.router = new TableIdRouter(routingRules);
     }
 

Reply via email to