This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 817485473005dfd054467eab4c2a275d833af723
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Wed Dec 5 15:55:23 2018 +0800

    [FLINK-11079] [core] Let ListSerializerSnapshot extend 
CompositeTypeSerializerSnapshot
---
 .../base/CollectionSerializerConfigSnapshot.java   |  9 ++--
 .../api/common/typeutils/base/ListSerializer.java  |  2 +-
 .../typeutils/base/ListSerializerSnapshot.java     | 56 ++++++----------------
 3 files changed, 21 insertions(+), 46 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 377dd4c..762a441 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -49,10 +49,13 @@ public final class CollectionSerializerConfigSnapshot<C 
extends Collection<T>, T
        @Override
        public TypeSerializerSchemaCompatibility<C> 
resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
                if (newSerializer instanceof ListSerializer) {
-                       ListSerializerSnapshot<T> listSerializerSnapshot =
-                               new 
ListSerializerSnapshot<>(((ListSerializer<T>) 
newSerializer).getElementSerializer());
+                       ListSerializer<T> newListSerializer = 
(ListSerializer<T>) newSerializer;
+                       ListSerializerSnapshot<T> listSerializerSnapshot = new 
ListSerializerSnapshot<>(newListSerializer);
 
-                       return 
listSerializerSnapshot.resolveSchemaCompatibility((ListSerializer) 
newSerializer);
+                       @SuppressWarnings("unchecked")
+                       TypeSerializerSchemaCompatibility<C> result = 
(TypeSerializerSchemaCompatibility<C>)
+                               
listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
+                       return result;
                } else {
                        return super.resolveSchemaCompatibility(newSerializer);
                }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 08b3333..5178031 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -175,6 +175,6 @@ public final class ListSerializer<T> extends 
TypeSerializer<List<T>> {
 
        @Override
        public TypeSerializerSnapshot<List<T>> snapshotConfiguration() {
-               return new ListSerializerSnapshot<>(elementSerializer);
+               return new ListSerializerSnapshot<>(this);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index 5f89d94..f90e22a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -18,73 +18,45 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ListSerializer}.
  */
-public class ListSerializerSnapshot<T> implements 
TypeSerializerSnapshot<List<T>> {
+public class ListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<List<T>, ListSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public ListSerializerSnapshot() {}
+       public ListSerializerSnapshot() {
+               super(ListSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public ListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-               this.nestedElementSerializerSnapshot = new 
CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+       public ListSerializerSnapshot(ListSerializer<T> listSerializer) {
+               super(listSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<List<T>> restoreSerializer() {
-               return new 
ListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-       }
-
-       @Override
-       public TypeSerializerSchemaCompatibility<List<T>> 
resolveSchemaCompatibility(TypeSerializer<List<T>> newSerializer) {
-               checkState(nestedElementSerializerSnapshot != null);
-
-               if (newSerializer instanceof ListSerializer) {
-                       ListSerializer<T> serializer = (ListSerializer<T>) 
newSerializer;
-
-                       return 
nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getElementSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+       protected ListSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
+               return new ListSerializer<>(elementSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedElementSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(ListSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
 }

Reply via email to