This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 817485473005dfd054467eab4c2a275d833af723 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Wed Dec 5 15:55:23 2018 +0800 [FLINK-11079] [core] Let ListSerializerSnapshot extend CompositeTypeSerializerSnapshot --- .../base/CollectionSerializerConfigSnapshot.java | 9 ++-- .../api/common/typeutils/base/ListSerializer.java | 2 +- .../typeutils/base/ListSerializerSnapshot.java | 56 ++++++---------------- 3 files changed, 21 insertions(+), 46 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java index 377dd4c..762a441 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java @@ -49,10 +49,13 @@ public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T @Override public TypeSerializerSchemaCompatibility<C> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) { if (newSerializer instanceof ListSerializer) { - ListSerializerSnapshot<T> listSerializerSnapshot = - new ListSerializerSnapshot<>(((ListSerializer<T>) newSerializer).getElementSerializer()); + ListSerializer<T> newListSerializer = (ListSerializer<T>) newSerializer; + ListSerializerSnapshot<T> listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer); - return listSerializerSnapshot.resolveSchemaCompatibility((ListSerializer) newSerializer); + @SuppressWarnings("unchecked") + TypeSerializerSchemaCompatibility<C> result = (TypeSerializerSchemaCompatibility<C>) + listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer); + return result; } else { return super.resolveSchemaCompatibility(newSerializer); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java index 08b3333..5178031 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java @@ -175,6 +175,6 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> { @Override public TypeSerializerSnapshot<List<T>> snapshotConfiguration() { - return new ListSerializerSnapshot<>(elementSerializer); + return new ListSerializerSnapshot<>(this); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java index 5f89d94..f90e22a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java @@ -18,73 +18,45 @@ package org.apache.flink.api.common.typeutils.base; -import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; import java.util.List; -import static org.apache.flink.util.Preconditions.checkState; - /** * Snapshot class for the {@link ListSerializer}. */ -public class ListSerializerSnapshot<T> implements TypeSerializerSnapshot<List<T>> { +public class ListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<List<T>, ListSerializer> { private static final int CURRENT_VERSION = 1; - private CompositeSerializerSnapshot nestedElementSerializerSnapshot; - /** * Constructor for read instantiation. */ - public ListSerializerSnapshot() {} + public ListSerializerSnapshot() { + super(ListSerializer.class); + } /** * Constructor to create the snapshot for writing. */ - public ListSerializerSnapshot(TypeSerializer<T> elementSerializer) { - this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer)); + public ListSerializerSnapshot(ListSerializer<T> listSerializer) { + super(listSerializer); } @Override - public int getCurrentVersion() { + public int getCurrentOuterSnapshotVersion() { return CURRENT_VERSION; } @Override - public TypeSerializer<List<T>> restoreSerializer() { - return new ListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0)); - } - - @Override - public TypeSerializerSchemaCompatibility<List<T>> resolveSchemaCompatibility(TypeSerializer<List<T>> newSerializer) { - checkState(nestedElementSerializerSnapshot != null); - - if (newSerializer instanceof ListSerializer) { - ListSerializer<T> serializer = (ListSerializer<T>) newSerializer; - - return nestedElementSerializerSnapshot.resolveCompatibilityWithNested( - TypeSerializerSchemaCompatibility.compatibleAsIs(), - serializer.getElementSerializer()); - } - else { - return TypeSerializerSchemaCompatibility.incompatible(); - } - } - - @Override - public void writeSnapshot(DataOutputView out) throws IOException { - nestedElementSerializerSnapshot.writeCompositeSnapshot(out); + protected ListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0]; + return new ListSerializer<>(elementSerializer); } @Override - public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader); + protected TypeSerializer<?>[] getNestedSerializers(ListSerializer outerSerializer) { + return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() }; } }