This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 154bb6e880af890a94a1a035e58ce00796022740
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Mon Feb 25 12:02:04 2019 +0800

    [FLINK-11741] [runtime] Replace ArrayListSerializer's ensureCompatibility 
method with SelfResolvingTypeSerializer implementation
---
 .../flink/runtime/state/ArrayListSerializer.java   | 49 ++++++++++------------
 1 file changed, 23 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index b1668fc..298b1df 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -33,7 +30,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 
 @SuppressWarnings("ForLoopReplaceableByForEach")
-final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> 
{
+final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>>
+               implements 
TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<ArrayList<T>> {
 
        private static final long serialVersionUID = 1119562170939152304L;
 
@@ -146,29 +144,28 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
        }
 
        /**
-        * NOTE: this method cannot be removed until {@link 
CollectionSerializerConfigSnapshot} is fully removed.
+        * We need to implement this method as a {@link 
TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer}
+        * because this serializer was previously returning a shared {@link 
CollectionSerializerConfigSnapshot}
+        * as its snapshot.
+        *
+        * <p>When the {@link CollectionSerializerConfigSnapshot} is restored, 
it is incapable of redirecting
+        * the compatibility check to {@link ArrayListSerializerSnapshot}, so 
we do it here.
         */
        @Override
-       @SuppressWarnings("deprecation")
-       public CompatibilityResult<ArrayList<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
-                       Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> 
previousElemSerializerAndConfig =
-                               ((CollectionSerializerConfigSnapshot<?, ?>) 
configSnapshot).getSingleNestedSerializerAndConfig();
-
-                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       previousElemSerializerAndConfig.f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       previousElemSerializerAndConfig.f1,
-                                       elementSerializer);
-
-                       if (!compatResult.isRequiresMigration()) {
-                               return CompatibilityResult.compatible();
-                       } else if (compatResult.getConvertDeserializer() != 
null) {
-                               return CompatibilityResult.requiresMigration(
-                                       new ArrayListSerializer<>(new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-                       }
+       public TypeSerializerSchemaCompatibility<ArrayList<T>> 
resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(
+                       TypeSerializerConfigSnapshot<ArrayList<T>> 
deprecatedConfigSnapshot) {
+
+               if (deprecatedConfigSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
+                       CollectionSerializerConfigSnapshot<ArrayList<T>, T> 
castedLegacySnapshot =
+                               
(CollectionSerializerConfigSnapshot<ArrayList<T>, T>) deprecatedConfigSnapshot;
+
+                       ArrayListSerializerSnapshot<T> newSnapshot = new 
ArrayListSerializerSnapshot<>();
+                       return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+                               this,
+                               newSnapshot,
+                               
castedLegacySnapshot.getNestedSerializerSnapshots());
                }
 
-               return CompatibilityResult.requiresMigration();
+               return TypeSerializerSchemaCompatibility.incompatible();
        }
 }

Reply via email to