This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e5de813258bfebda30a7c297a33546a4404c7cf Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Sat Jan 6 13:36:40 2024 +0800 [FLINK-30613][serializer] Make StateBackend use the new method of resolving schema compatibility --- .../flink/runtime/state/StateSerializerProvider.java | 8 ++++++-- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 4 +++- .../streaming/api/operators/InternalTimerServiceImpl.java | 14 ++++++++------ 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java index 81f74e04d14..4ed106e88cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java @@ -304,7 +304,9 @@ public abstract class StateSerializerProvider<T> { } TypeSerializerSchemaCompatibility<T> result = - previousSerializerSnapshot.resolveSchemaCompatibility(newSerializer); + newSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(previousSerializerSnapshot); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } @@ -358,7 +360,9 @@ public abstract class StateSerializerProvider<T> { this.previousSerializerSnapshot = previousSerializerSnapshot; TypeSerializerSchemaCompatibility<T> result = - previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer); + Preconditions.checkNotNull(registeredSerializer) + .snapshotConfiguration() + .resolveSchemaCompatibility(previousSerializerSnapshot); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 49daa54f143..9766ebde385 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -856,7 +856,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { (TypeSerializer<UK>) newMapStateSerializer.getKeySerializer(); TypeSerializerSchemaCompatibility<UK> keyCompatibility = - previousKeySerializerSnapshot.resolveSchemaCompatibility(newUserKeySerializer); + newUserKeySerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(previousKeySerializerSnapshot); return keyCompatibility.isCompatibleAsIs(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index e2c7e4139b2..1345916132a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -149,9 +149,10 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> { // the following is the case where we restore if (restoredTimersSnapshot != null) { TypeSerializerSchemaCompatibility<K> keySerializerCompatibility = - restoredTimersSnapshot - .getKeySerializerSnapshot() - .resolveSchemaCompatibility(keySerializer); + keySerializer + .snapshotConfiguration() + .resolveSchemaCompatibility( + restoredTimersSnapshot.getKeySerializerSnapshot()); if (keySerializerCompatibility.isIncompatible() || keySerializerCompatibility.isCompatibleAfterMigration()) { @@ -160,9 +161,10 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> { } TypeSerializerSchemaCompatibility<N> namespaceSerializerCompatibility = - restoredTimersSnapshot - .getNamespaceSerializerSnapshot() - .resolveSchemaCompatibility(namespaceSerializer); + namespaceSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility( + restoredTimersSnapshot.getNamespaceSerializerSnapshot()); restoredTimersSnapshot = null;