This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd3b011513f8432da83d0892b59d1e8852620e93
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Tue Jan 24 10:37:23 2023 +0800

    [FLINK-30613][serializer] Migrate ScalaCaseClassSerializerSnapshot to 
implement new method of resolving schema compatibility
---
 .../api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
index 1cb46fdeb7d..243d03bd44b 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializerSnapshot.java
@@ -86,8 +86,14 @@ public final class ScalaCaseClassSerializerSnapshot<T 
extends scala.Product>
 
     @Override
     protected CompositeTypeSerializerSnapshot.OuterSchemaCompatibility
-            resolveOuterSchemaCompatibility(ScalaCaseClassSerializer<T> 
newSerializer) {
-        return (Objects.equals(type, newSerializer.getTupleClass()))
+            resolveOuterSchemaCompatibility(TypeSerializerSnapshot<T> 
oldSerializerSnapshot) {
+        if (!(oldSerializerSnapshot instanceof 
ScalaCaseClassSerializerSnapshot)) {
+            return OuterSchemaCompatibility.INCOMPATIBLE;
+        }
+
+        ScalaCaseClassSerializerSnapshot<T> oldSnapshot =
+                (ScalaCaseClassSerializerSnapshot<T>) oldSerializerSnapshot;
+        return (Objects.equals(type, oldSnapshot.type))
                 ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
                 : OuterSchemaCompatibility.INCOMPATIBLE;
     }

Reply via email to