This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 75c62a0ade2adcd9e54845370da7649b7421ade2 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Tue Jan 24 01:02:18 2023 +0800 [FLINK-30613][serializer] Migrate JavaEitherSerializerSnapshot to implement new method of resolving schema compatibility --- .../typeutils/runtime/EitherSerializerSnapshot.java | 19 +++---------------- .../runtime/JavaEitherSerializerSnapshot.java | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java index bd66a87d5fe..5c463eab3af 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java @@ -19,10 +19,8 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -104,19 +102,8 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps nestedSnapshot.getRestoredNestedSerializer(1)); } - @Override - public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility( - TypeSerializer<Either<L, R>> newSerializer) { - checkState(nestedSnapshot != null); - - if (newSerializer instanceof EitherSerializer) { - // delegate compatibility check to the new snapshot class - return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( - newSerializer, - new JavaEitherSerializerSnapshot<>(), - nestedSnapshot.getNestedSerializerSnapshots()); - } else { - return TypeSerializerSchemaCompatibility.incompatible(); - } + @Nullable + public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { + return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots(); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java index 3ce1cbef50b..2266cea6eec 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java @@ -19,9 +19,14 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.types.Either; +import java.util.Objects; + /** Snapshot class for the {@link EitherSerializer}. */ public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer<L, R>> { @@ -44,6 +49,20 @@ public class JavaEitherSerializerSnapshot<L, R> return CURRENT_VERSION; } + @Override + public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility( + TypeSerializerSnapshot<Either<L, R>> oldSerializerSnapshot) { + if (oldSerializerSnapshot instanceof EitherSerializerSnapshot) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + oldSerializerSnapshot, + this, + Objects.requireNonNull( + ((EitherSerializerSnapshot<L, R>) oldSerializerSnapshot) + .getNestedSerializerSnapshots())); + } + return super.resolveSchemaCompatibility(oldSerializerSnapshot); + } + @Override protected EitherSerializer<L, R> createOuterSerializerWithNestedSerializers( TypeSerializer<?>[] nestedSerializers) {