This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 75c62a0ade2adcd9e54845370da7649b7421ade2
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Tue Jan 24 01:02:18 2023 +0800

    [FLINK-30613][serializer] Migrate JavaEitherSerializerSnapshot to implement 
new method of resolving schema compatibility
---
 .../typeutils/runtime/EitherSerializerSnapshot.java   | 19 +++----------------
 .../runtime/JavaEitherSerializerSnapshot.java         | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index bd66a87d5fe..5c463eab3af 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,10 +19,8 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -104,19 +102,8 @@ public final class EitherSerializerSnapshot<L, R> 
implements TypeSerializerSnaps
                 nestedSnapshot.getRestoredNestedSerializer(1));
     }
 
-    @Override
-    public TypeSerializerSchemaCompatibility<Either<L, R>> 
resolveSchemaCompatibility(
-            TypeSerializer<Either<L, R>> newSerializer) {
-        checkState(nestedSnapshot != null);
-
-        if (newSerializer instanceof EitherSerializer) {
-            // delegate compatibility check to the new snapshot class
-            return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
-                    newSerializer,
-                    new JavaEitherSerializerSnapshot<>(),
-                    nestedSnapshot.getNestedSerializerSnapshots());
-        } else {
-            return TypeSerializerSchemaCompatibility.incompatible();
-        }
+    @Nullable
+    public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
+        return nestedSnapshot == null ? null : 
nestedSnapshot.getNestedSerializerSnapshots();
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
index 3ce1cbef50b..2266cea6eec 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -19,9 +19,14 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.types.Either;
 
+import java.util.Objects;
+
 /** Snapshot class for the {@link EitherSerializer}. */
 public class JavaEitherSerializerSnapshot<L, R>
         extends CompositeTypeSerializerSnapshot<Either<L, R>, 
EitherSerializer<L, R>> {
@@ -44,6 +49,20 @@ public class JavaEitherSerializerSnapshot<L, R>
         return CURRENT_VERSION;
     }
 
+    @Override
+    public TypeSerializerSchemaCompatibility<Either<L, R>> 
resolveSchemaCompatibility(
+            TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) {
+        if (oldSerializerSnapshot instanceof EitherSerializerSnapshot) {
+            return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+                    oldSerializerSnapshot,
+                    this,
+                    Objects.requireNonNull(
+                            ((EitherSerializerSnapshot<L, R>) 
oldSerializerSnapshot)
+                                    .getNestedSerializerSnapshots()));
+        }
+        return super.resolveSchemaCompatibility(oldSerializerSnapshot);
+    }
+
     @Override
     protected EitherSerializer<L, R> 
createOuterSerializerWithNestedSerializers(
             TypeSerializer<?>[] nestedSerializers) {

Reply via email to