This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e5de813258bfebda30a7c297a33546a4404c7cf
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Sat Jan 6 13:36:40 2024 +0800

    [FLINK-30613][serializer] Make StateBackend use the new method of resolving 
schema compatibility
---
 .../flink/runtime/state/StateSerializerProvider.java       |  8 ++++++--
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java  |  4 +++-
 .../streaming/api/operators/InternalTimerServiceImpl.java  | 14 ++++++++------
 3 files changed, 17 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
index 81f74e04d14..4ed106e88cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java
@@ -304,7 +304,9 @@ public abstract class StateSerializerProvider<T> {
             }
 
             TypeSerializerSchemaCompatibility<T> result =
-                    
previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
+                    newSerializer
+                            .snapshotConfiguration()
+                            
.resolveSchemaCompatibility(previousSerializerSnapshot);
             if (result.isIncompatible()) {
                 invalidateCurrentSchemaSerializerAccess();
             }
@@ -358,7 +360,9 @@ public abstract class StateSerializerProvider<T> {
             this.previousSerializerSnapshot = previousSerializerSnapshot;
 
             TypeSerializerSchemaCompatibility<T> result =
-                    
previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer);
+                    Preconditions.checkNotNull(registeredSerializer)
+                            .snapshotConfiguration()
+                            
.resolveSchemaCompatibility(previousSerializerSnapshot);
             if (result.isIncompatible()) {
                 invalidateCurrentSchemaSerializerAccess();
             }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 49daa54f143..9766ebde385 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -856,7 +856,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 (TypeSerializer<UK>) newMapStateSerializer.getKeySerializer();
 
         TypeSerializerSchemaCompatibility<UK> keyCompatibility =
-                
previousKeySerializerSnapshot.resolveSchemaCompatibility(newUserKeySerializer);
+                newUserKeySerializer
+                        .snapshotConfiguration()
+                        
.resolveSchemaCompatibility(previousKeySerializerSnapshot);
         return keyCompatibility.isCompatibleAsIs();
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index e2c7e4139b2..1345916132a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -149,9 +149,10 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N> {
             // the following is the case where we restore
             if (restoredTimersSnapshot != null) {
                 TypeSerializerSchemaCompatibility<K> 
keySerializerCompatibility =
-                        restoredTimersSnapshot
-                                .getKeySerializerSnapshot()
-                                .resolveSchemaCompatibility(keySerializer);
+                        keySerializer
+                                .snapshotConfiguration()
+                                .resolveSchemaCompatibility(
+                                        
restoredTimersSnapshot.getKeySerializerSnapshot());
 
                 if (keySerializerCompatibility.isIncompatible()
                         || 
keySerializerCompatibility.isCompatibleAfterMigration()) {
@@ -160,9 +161,10 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N> {
                 }
 
                 TypeSerializerSchemaCompatibility<N> 
namespaceSerializerCompatibility =
-                        restoredTimersSnapshot
-                                .getNamespaceSerializerSnapshot()
-                                
.resolveSchemaCompatibility(namespaceSerializer);
+                        namespaceSerializer
+                                .snapshotConfiguration()
+                                .resolveSchemaCompatibility(
+                                        
restoredTimersSnapshot.getNamespaceSerializerSnapshot());
 
                 restoredTimersSnapshot = null;
 

Reply via email to