This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 154bb6e880af890a94a1a035e58ce00796022740 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Mon Feb 25 12:02:04 2019 +0800 [FLINK-11741] [runtime] Replace ArrayListSerializer's ensureCompatibility method with SelfResolvingTypeSerializer implementation --- .../flink/runtime/state/ArrayListSerializer.java | 49 ++++++++++------------ 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java index b1668fc..298b1df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -17,15 +17,12 @@ */ package org.apache.flink.runtime.state; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -33,7 +30,8 @@ import java.io.IOException; import java.util.ArrayList; @SuppressWarnings("ForLoopReplaceableByForEach") -final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { +final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> + implements TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<ArrayList<T>> { private static final long serialVersionUID = 1119562170939152304L; @@ -146,29 +144,28 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { } /** - * NOTE: this method cannot be removed until {@link CollectionSerializerConfigSnapshot} is fully removed. + * We need to implement this method as a {@link TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer} + * because this serializer was previously returning a shared {@link CollectionSerializerConfigSnapshot} + * as its snapshot. + * + * <p>When the {@link CollectionSerializerConfigSnapshot} is restored, it is incapable of redirecting + * the compatibility check to {@link ArrayListSerializerSnapshot}, so we do it here. */ @Override - @SuppressWarnings("deprecation") - public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { - Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousElemSerializerAndConfig = - ((CollectionSerializerConfigSnapshot<?, ?>) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousElemSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousElemSerializerAndConfig.f1, - elementSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new ArrayListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } + public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass( + TypeSerializerConfigSnapshot<ArrayList<T>> deprecatedConfigSnapshot) { + + if (deprecatedConfigSnapshot instanceof CollectionSerializerConfigSnapshot) { + CollectionSerializerConfigSnapshot<ArrayList<T>, T> castedLegacySnapshot = + (CollectionSerializerConfigSnapshot<ArrayList<T>, T>) deprecatedConfigSnapshot; + + ArrayListSerializerSnapshot<T> newSnapshot = new ArrayListSerializerSnapshot<>(); + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + this, + newSnapshot, + castedLegacySnapshot.getNestedSerializerSnapshots()); } - return CompatibilityResult.requiresMigration(); + return TypeSerializerSchemaCompatibility.incompatible(); } }