This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 30a8d2c9423d0ed9da4b80d41889e64de7ac1576
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Mon Feb 6 23:44:13 2023 +0800

    [FLINK-30613][serializer] Migrate GenericTypeSerializerSnapshot to 
implement new method of resolving schema compatibility
---
 .../api/common/typeutils/GenericTypeSerializerSnapshot.java | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java
index ac7e2f06f16..9cc6cfe60e4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java
@@ -81,13 +81,16 @@ public abstract class GenericTypeSerializerSnapshot<T, S 
extends TypeSerializer>
 
     @Override
     public final TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(
-            TypeSerializer<T> newSerializer) {
-        if (!serializerClass().isInstance(newSerializer)) {
+            TypeSerializerSnapshot<T> oldSerializerSnapshot) {
+        if (!(oldSerializerSnapshot instanceof GenericTypeSerializerSnapshot)) 
{
             return TypeSerializerSchemaCompatibility.incompatible();
         }
-        @SuppressWarnings("unchecked")
-        S casted = (S) newSerializer;
-        if (typeClass == getTypeClass(casted)) {
+        GenericTypeSerializerSnapshot<T, S> 
previousGenericTypeSerializerSnapshot =
+                (GenericTypeSerializerSnapshot<T, S>) oldSerializerSnapshot;
+        if (serializerClass() != 
previousGenericTypeSerializerSnapshot.serializerClass()) {
+            return TypeSerializerSchemaCompatibility.incompatible();
+        }
+        if (typeClass == previousGenericTypeSerializerSnapshot.typeClass) {
             return TypeSerializerSchemaCompatibility.compatibleAsIs();
         } else {
             return TypeSerializerSchemaCompatibility.incompatible();

Reply via email to