This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a3e0650562f9117381cad87f57d6b4102d7b1421 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Tue Jan 24 00:56:54 2023 +0800 [FLINK-30613][serializer] Migrate GenericArraySerializerSnapshot to implement new method of resolving schema compatibility --- .../base/GenericArraySerializerConfigSnapshot.java | 23 +++++---------- .../base/GenericArraySerializerSnapshot.java | 33 ++++++++++++++++++++-- .../CompositeTypeSerializerUpgradeTest.java | 16 +++++++++++ 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java index 4b8af37a3d9..cb9369cc3e7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java @@ -19,10 +19,7 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -108,19 +105,13 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial componentClass, nestedSnapshot.getRestoredNestedSerializer(0)); } - @Override - public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility( - TypeSerializer<C[]> newSerializer) { - checkState(nestedSnapshot != null); - - if (!(newSerializer instanceof GenericArraySerializer)) { - return TypeSerializerSchemaCompatibility.incompatible(); - } + @Nullable + public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() { + return nestedSnapshot == null ? null : nestedSnapshot.getNestedSerializerSnapshots(); + } - // delegate to the new snapshot class - return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( - newSerializer, - new GenericArraySerializerSnapshot<>(componentClass), - nestedSnapshot.getNestedSerializerSnapshots()); + @Nullable + public Class<C> getComponentClass() { + return componentClass; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java index 15eb10eac42..bdb6cef922c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java @@ -19,12 +19,16 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; +import java.util.Objects; /** * Point-in-time configuration of a {@link GenericArraySerializer}. @@ -76,10 +80,35 @@ public final class GenericArraySerializerSnapshot<C> this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } + @Override + public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility( + TypeSerializerSnapshot<C[]> oldSerializerSnapshot) { + if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + oldSerializerSnapshot, + this, + Objects.requireNonNull( + ((GenericArraySerializerConfigSnapshot<C>) oldSerializerSnapshot) + .getNestedSerializerSnapshots())); + } + return super.resolveSchemaCompatibility(oldSerializerSnapshot); + } + @Override protected OuterSchemaCompatibility resolveOuterSchemaCompatibility( - GenericArraySerializer<C> newSerializer) { - return (this.componentClass == newSerializer.getComponentClass()) + TypeSerializerSnapshot<C[]> oldSerializerSnapshot) { + Class<C> componentClass; + if (oldSerializerSnapshot instanceof GenericArraySerializerSnapshot) { + componentClass = + ((GenericArraySerializerSnapshot<C>) oldSerializerSnapshot).componentClass; + } else if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) { + componentClass = + ((GenericArraySerializerConfigSnapshot<C>) oldSerializerSnapshot) + .getComponentClass(); + } else { + return OuterSchemaCompatibility.INCOMPATIBLE; + } + return (this.componentClass == componentClass) ? OuterSchemaCompatibility.COMPATIBLE_AS_IS : OuterSchemaCompatibility.INCOMPATIBLE; } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java index 2b0d40e1753..6248a9d49aa 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java @@ -20,16 +20,19 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.FlinkVersion; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; +import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; import org.apache.flink.types.Either; import org.hamcrest.Matcher; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collection; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; import static org.hamcrest.Matchers.is; /** A {@link TypeSerializerUpgradeTestBase} for {@link GenericArraySerializer}. */ @@ -143,4 +146,17 @@ class CompositeTypeSerializerUpgradeTest extends TypeSerializerUpgradeTestBase<O return TypeSerializerMatchers.isCompatibleAsIs(); } } + + @Test + public void testUpgradeFromDeprecatedSnapshot() { + GenericArraySerializer<String> genericArraySerializer = + new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE); + GenericArraySerializerConfigSnapshot<String> oldSnapshot = + new GenericArraySerializerConfigSnapshot<>(genericArraySerializer); + TypeSerializerSchemaCompatibility<String[]> schemaCompatibility = + genericArraySerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(oldSnapshot); + assertThat(schemaCompatibility.isCompatibleAsIs()).isTrue(); + } }