asfgit closed pull request #7422: [FLINK-11073] (part 2) Introduce 
CompositeTypeSerializerSnapshot and migrate existing composite serializers' 
snapshots
URL: https://github.com/apache/flink/pull/7422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
new file mode 100644
index 00000000000..c73e24c0897
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link CompositeTypeSerializerSnapshot} is a convenient serializer 
snapshot class that can be used by
+ * simple serializers which 1) delegates its serialization to multiple nested 
serializers, and 2) may contain
+ * some extra static information that needs to be persisted as part of its 
snapshot.
+ *
+ * <p>Examples for this would be the {@link ListSerializer}, {@link 
MapSerializer}, {@link EitherSerializer}, etc.,
+ * in which case the serializer, called the "outer" serializer in this 
context, has only some nested serializers that
+ * needs to be persisted as its snapshot, and nothing else that needs to be 
persisted as the "outer" snapshot.
+ * An example which has non-empty outer snapshots would be the {@link 
GenericArraySerializer}, which beyond the
+ * nested component serializer, also contains a class of the component type 
that needs to be persisted.
+ *
+ * <p>Serializers that do have some outer snapshot needs to make sure to 
implement the methods
+ * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, 
DataInputView, ClassLoader)}, and
+ * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as 
the base for its serializer snapshot
+ * class. By default, the base implementations of these methods are empty, 
i.e. this class assumes that
+ * subclasses do not have any outer snapshot that needs to be persisted.
+ *
+ * <h2>Snapshot Versioning</h2>
+ *
+ * <p>This base class has its own versioning for the format in which it writes 
the outer snapshot and the
+ * nested serializer snapshots. The version of the serialization format of 
this based class is defined
+ * by {@link #getCurrentVersion()}. This is independent of the version in 
which subclasses writes their outer snapshot,
+ * defined by {@link #getCurrentOuterSnapshotVersion()}.
+ * This means that the outer snapshot's version can be maintained only taking 
into account changes in how the
+ * outer snapshot is written. Any changes in the base format does not require 
upticks in the outer snapshot's version.
+ *
+ * <h2>Serialization Format</hr>
+ *
+ * <p>The current version of the serialization format of a {@link 
CompositeTypeSerializerSnapshot} is as follows:
+ *
+ * <pre>{@code
+ * 
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | CompositeTypeSerializerSnapshot | CompositeTypeSerializerSnapshot |       
   Outer snapshot         |
+ * |           version               |          MAGIC_NUMBER           |       
       version            |
+ * 
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                               Outer snapshot              
                          |
+ * |                                   #writeOuterSnapshot(DataOutputView out) 
                          |
+ * 
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |      Delegate MAGIC_NUMBER      |         Delegate version        |     
Num. nested serializers     |
+ * 
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                     Nested serializer snapshots           
                          |
+ * 
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }</pre>
+ *
+ * @param <T> The data type that the originating serializer of this snapshot 
serializes.
+ * @param <S> The type of the originating serializer.
+ */
+@PublicEvolving
+public abstract class CompositeTypeSerializerSnapshot<T, S extends 
TypeSerializer> implements TypeSerializerSnapshot<T> {
+
+       /** Magic number for integrity checks during deserialization. */
+       private static final int MAGIC_NUMBER = 911108;
+
+       /**
+        * Current version of the base serialization format.
+        *
+        * <p>NOTE: We start from version 3. This version is represented by the 
{@link #getCurrentVersion()} method.
+        * Previously, this method was used to represent the outer snapshot's 
version (now, represented
+        * by the {@link #getCurrentOuterSnapshotVersion()} method).
+        *
+        * <p>To bridge this transition, we set the starting version of the 
base format to be at least
+        * larger than the highest version of previously defined values in 
implementing subclasses,
+        * which was {@link #HIGHEST_LEGACY_READ_VERSION}. This allows us to 
identify legacy deserialization paths,
+        * which did not contain versioning for the base format, simply by 
checking if the read
+        * version of the snapshot is smaller than or equal to {@link 
#HIGHEST_LEGACY_READ_VERSION}.
+        */
+       private static final int VERSION = 3;
+
+       private static final int HIGHEST_LEGACY_READ_VERSION = 2;
+
+       private NestedSerializersSnapshotDelegate 
nestedSerializersSnapshotDelegate;
+
+       private final Class<S> correspondingSerializerClass;
+
+       /**
+        * Constructor to be used for read instantiation.
+        *
+        * @param correspondingSerializerClass the expected class of the new 
serializer.
+        */
+       public CompositeTypeSerializerSnapshot(Class<S> 
correspondingSerializerClass) {
+               this.correspondingSerializerClass = 
Preconditions.checkNotNull(correspondingSerializerClass);
+       }
+
+       /**
+        * Constructor to be used for writing the snapshot.
+        *
+        * @param serializerInstance an instance of the originating serializer 
of this snapshot.
+        */
+       @SuppressWarnings("unchecked")
+       public CompositeTypeSerializerSnapshot(S serializerInstance) {
+               Preconditions.checkNotNull(serializerInstance);
+               this.nestedSerializersSnapshotDelegate = new 
NestedSerializersSnapshotDelegate(getNestedSerializers(serializerInstance));
+               this.correspondingSerializerClass = (Class<S>) 
serializerInstance.getClass();
+       }
+
+       @Override
+       public final int getCurrentVersion() {
+               return VERSION;
+       }
+
+       @Override
+       public final void writeSnapshot(DataOutputView out) throws IOException {
+               internalWriteOuterSnapshot(out);
+               
nestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(out);
+       }
+
+       @Override
+       public final void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+               if (readVersion > HIGHEST_LEGACY_READ_VERSION) {
+                       internalReadOuterSnapshot(in, userCodeClassLoader);
+               } else {
+                       legacyInternalReadOuterSnapshot(readVersion, in, 
userCodeClassLoader);
+               }
+               this.nestedSerializersSnapshotDelegate = 
NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, 
userCodeClassLoader);
+       }
+
+       @Override
+       public final TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+               if (newSerializer.getClass() != correspondingSerializerClass) {
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+
+               S castedNewSerializer = 
correspondingSerializerClass.cast(newSerializer);
+
+               // check that outer configuration is compatible; if not, short 
circuit result
+               if (!isOuterSnapshotCompatible(castedNewSerializer)) {
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+
+               // since outer configuration is compatible, the final 
compatibility result depends only on the nested serializers
+               return constructFinalSchemaCompatibilityResult(
+                       getNestedSerializers(castedNewSerializer),
+                       
nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
+       }
+
+       @Override
+       public final TypeSerializer<T> restoreSerializer() {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<T> serializer = (TypeSerializer<T>)
+                       
createOuterSerializerWithNestedSerializers(nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
+
+               return serializer;
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       //  Outer serializer access methods
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Returns the version of the current outer snapshot's written binary 
format.
+        *
+        * @return the version of the current outer snapshot's written binary 
format.
+        */
+       protected abstract int getCurrentOuterSnapshotVersion();
+
+       /**
+        * Gets the nested serializers from the outer serializer.
+        *
+        * @param outerSerializer the outer serializer.
+        *
+        * @return the nested serializers.
+        */
+       protected abstract TypeSerializer<?>[] getNestedSerializers(S 
outerSerializer);
+
+       /**
+        * Creates an instance of the outer serializer with a given array of 
its nested serializers.
+        *
+        * @param nestedSerializers array of nested serializers to create the 
outer serializer with.
+        *
+        * @return an instance of the outer serializer.
+        */
+       protected abstract S 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers);
+
+       // 
------------------------------------------------------------------------------------------
+       //  Outer snapshot methods; need to be overridden if outer snapshot is 
not empty,
+       //  or in other words, the outer serializer has extra configuration 
beyond its nested serializers.
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Writes the outer snapshot, i.e. any information beyond the nested 
serializers of the outer serializer.
+        *
+        * <p>The base implementation of this methods writes nothing, i.e. it 
assumes that the outer serializer
+        * only has nested serializers and no extra information. Otherwise, if 
the outer serializer contains
+        * some extra information that needs to be persisted as part of the 
serializer snapshot, this
+        * must be overridden. Note that this method and the corresponding 
methods
+        * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link 
#isOuterSnapshotCompatible(TypeSerializer)}
+        * needs to be implemented.
+        *
+        * @param out the {@link DataOutputView} to write the outer snapshot to.
+        */
+       protected void writeOuterSnapshot(DataOutputView out) throws 
IOException {}
+
+       /**
+        * Reads the outer snapshot, i.e. any information beyond the nested 
serializers of the outer serializer.
+        *
+        * <p>The base implementation of this methods reads nothing, i.e. it 
assumes that the outer serializer
+        * only has nested serializers and no extra information. Otherwise, if 
the outer serializer contains
+        * some extra information that has been persisted as part of the 
serializer snapshot, this
+        * must be overridden. Note that this method and the corresponding 
methods
+        * {@link #writeOuterSnapshot(DataOutputView)}, {@link 
#isOuterSnapshotCompatible(TypeSerializer)}
+        * needs to be implemented.
+        *
+        * @param readOuterSnapshotVersion the read version of the outer 
snapshot.
+        * @param in the {@link DataInputView} to read the outer snapshot from.
+        * @param userCodeClassLoader the user code class loader.
+        */
+       protected void readOuterSnapshot(int readOuterSnapshotVersion, 
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {}
+
+       /**
+        * Checks whether the outer snapshot is compatible with a given new 
serializer.
+        *
+        * <p>The base implementation of this method just returns {@code true}, 
i.e. it assumes that the outer serializer
+        * only has nested serializers and no extra information, and therefore 
the result of the check must always
+        * be true. Otherwise, if the outer serializer contains
+        * some extra information that has been persisted as part of the 
serializer snapshot, this
+        * must be overridden. Note that this method and the corresponding 
methods
+        * {@link #writeOuterSnapshot(DataOutputView)}, {@link 
#readOuterSnapshot(int, DataInputView, ClassLoader)}
+        * needs to be implemented.
+        *
+        * @param newSerializer the new serializer, which contains the new 
outer information to check against.
+        *
+        * @return a flag indicating whether or not the new serializer's outer 
information is compatible with the one
+        *         written in this snapshot.
+        */
+       protected boolean isOuterSnapshotCompatible(S newSerializer) {
+               return true;
+       }
+
+       // 
------------------------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------------------------
+
+       private void internalWriteOuterSnapshot(DataOutputView out) throws 
IOException {
+               out.writeInt(MAGIC_NUMBER);
+               out.writeInt(getCurrentOuterSnapshotVersion());
+
+               writeOuterSnapshot(out);
+       }
+
+       private void internalReadOuterSnapshot(DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+               final int magicNumber = in.readInt();
+               if (magicNumber != MAGIC_NUMBER) {
+                       throw new IOException(String.format("Corrupt data, 
magic number mismatch. Expected %8x, found %8x",
+                               MAGIC_NUMBER, magicNumber));
+               }
+
+               final int outerSnapshotVersion = in.readInt();
+               readOuterSnapshot(outerSnapshotVersion, in, 
userCodeClassLoader);
+       }
+
+       private void legacyInternalReadOuterSnapshot(
+                       int legacyReadVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+
+               // legacy versions did not contain the pre-fixed magic numbers; 
just read the outer snapshot
+               readOuterSnapshot(legacyReadVersion, in, userCodeClassLoader);
+       }
+
+       private TypeSerializerSchemaCompatibility<T> 
constructFinalSchemaCompatibilityResult(
+                       TypeSerializer<?>[] newNestedSerializers,
+                       TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+
+               Preconditions.checkArgument(newNestedSerializers.length == 
nestedSerializerSnapshots.length,
+                       "Different number of new serializers and existing 
serializer snapshots.");
+
+               TypeSerializer<?>[] reconfiguredNestedSerializers = new 
TypeSerializer[newNestedSerializers.length];
+
+               // check nested serializers for compatibility
+               boolean nestedSerializerRequiresMigration = false;
+               boolean hasReconfiguredNestedSerializers = false;
+               for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
+                       TypeSerializerSchemaCompatibility<?> compatibility =
+                               resolveCompatibility(newNestedSerializers[i], 
nestedSerializerSnapshots[i]);
+
+                       // if any one of the new nested serializers is 
incompatible, we can just short circuit the result
+                       if (compatibility.isIncompatible()) {
+                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                       }
+
+                       if (compatibility.isCompatibleAfterMigration()) {
+                               nestedSerializerRequiresMigration = true;
+                       } else if 
(compatibility.isCompatibleWithReconfiguredSerializer()) {
+                               hasReconfiguredNestedSerializers = true;
+                               reconfiguredNestedSerializers[i] = 
compatibility.getReconfiguredSerializer();
+                       } else if (compatibility.isCompatibleAsIs()) {
+                               reconfiguredNestedSerializers[i] = 
newNestedSerializers[i];
+                       } else {
+                               throw new IllegalStateException("Undefined 
compatibility type.");
+                       }
+               }
+
+               if (nestedSerializerRequiresMigration) {
+                       return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+               }
+
+               if (hasReconfiguredNestedSerializers) {
+                       @SuppressWarnings("unchecked")
+                       TypeSerializer<T> reconfiguredCompositeSerializer = 
createOuterSerializerWithNestedSerializers(reconfiguredNestedSerializers);
+                       return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
+               }
+
+               // ends up here if everything is compatible as is
+               return TypeSerializerSchemaCompatibility.compatibleAsIs();
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <E> TypeSerializerSchemaCompatibility<E> 
resolveCompatibility(
+               TypeSerializer<?> serializer,
+               TypeSerializerSnapshot<?> snapshot) {
+
+               TypeSerializer<E> typedSerializer = (TypeSerializer<E>) 
serializer;
+               TypeSerializerSnapshot<E> typedSnapshot = 
(TypeSerializerSnapshot<E>) snapshot;
+
+               return 
typedSnapshot.resolveSchemaCompatibility(typedSerializer);
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
similarity index 79%
rename from 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
rename to 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
index 93f5a703257..a4dcdd2cda9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,21 +30,21 @@
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A CompositeSerializerSnapshot represents the snapshots of multiple 
serializers that are used
+ * A NestedSerializersSnapshotDelegate represents the snapshots of multiple 
serializers that are used
  * by an outer serializer. Examples would be tuples, where the outer 
serializer is the tuple
- * format serializer, an the CompositeSerializerSnapshot holds the serializers 
for the
+ * format serializer, and the NestedSerializersSnapshotDelegate holds the 
serializers for the
  * different tuple fields.
  *
- * <p>The CompositeSerializerSnapshot does not implement the {@link 
TypeSerializerSnapshot} interface.
+ * <p>The NestedSerializersSnapshotDelegate does not implement the {@link 
TypeSerializerSnapshot} interface.
  * It is not meant to be inherited from, but to be composed with a serializer 
snapshot implementation.
  *
- * <p>The CompositeSerializerSnapshot has its own versioning internally, it 
does not couple its
+ * <p>The NestedSerializersSnapshotDelegate has its own versioning internally, 
it does not couple its
  * versioning to the versioning of the TypeSerializerSnapshot that builds on 
top of this class.
- * That way, the CompositeSerializerSnapshot and enclosing 
TypeSerializerSnapshot the can evolve
+ * That way, the NestedSerializersSnapshotDelegate and enclosing 
TypeSerializerSnapshot the can evolve
  * their formats independently.
  */
-@PublicEvolving
-public class CompositeSerializerSnapshot {
+@Internal
+public class NestedSerializersSnapshotDelegate {
 
        /** Magic number for integrity checks during deserialization. */
        private static final int MAGIC_NUMBER = 1333245;
@@ -58,14 +58,14 @@
        /**
         * Constructor to create a snapshot for writing.
         */
-       public CompositeSerializerSnapshot(TypeSerializer<?>... serializers) {
+       public NestedSerializersSnapshotDelegate(TypeSerializer<?>... 
serializers) {
                this.nestedSnapshots = 
TypeSerializerUtils.snapshotBackwardsCompatible(serializers);
        }
 
        /**
         * Constructor to create a snapshot during deserialization.
         */
-       private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] 
snapshots) {
+       private NestedSerializersSnapshotDelegate(TypeSerializerSnapshot<?>[] 
snapshots) {
                this.nestedSnapshots = snapshots;
        }
 
@@ -77,14 +77,14 @@ private 
CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
         * Produces a restore serializer from each contained serializer 
configuration snapshot.
         * The serializers are returned in the same order as the snapshots are 
stored.
         */
-       public TypeSerializer<?>[] getRestoreSerializers() {
+       public TypeSerializer<?>[] getRestoredNestedSerializers() {
                return snapshotsToRestoreSerializers(nestedSnapshots);
        }
 
        /**
         * Creates the restore serializer from the pos-th config snapshot.
         */
-       public <T> TypeSerializer<T> getRestoreSerializer(int pos) {
+       public <T> TypeSerializer<T> getRestoredNestedSerializer(int pos) {
                checkArgument(pos < nestedSnapshots.length);
 
                @SuppressWarnings("unchecked")
@@ -93,10 +93,23 @@ private 
CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
                return snapshot.restoreSerializer();
        }
 
+       /**
+        * Returns the snapshots of the nested serializers.
+        *
+        * @return the snapshots of the nested serializers.
+        */
+       public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
+               return nestedSnapshots;
+       }
+
        /**
         * Resolves the compatibility of the nested serializer snapshots with 
the nested
         * serializers of the new outer serializer.
+        *
+        * @deprecated this no method will be removed in the future. Resolving 
compatibility for nested
+        *             serializers is now handled by {@link 
CompositeTypeSerializerSnapshot}.
         */
+       @Deprecated
        public <T> TypeSerializerSchemaCompatibility<T> 
resolveCompatibilityWithNested(
                        TypeSerializerSchemaCompatibility<?> outerCompatibility,
                        TypeSerializer<?>... newNestedSerializers) {
@@ -135,7 +148,7 @@ private 
CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
        /**
         * Writes the composite snapshot of all the contained serializers.
         */
-       public final void writeCompositeSnapshot(DataOutputView out) throws 
IOException {
+       public final void writeNestedSerializerSnapshots(DataOutputView out) 
throws IOException {
                out.writeInt(MAGIC_NUMBER);
                out.writeInt(VERSION);
 
@@ -148,7 +161,7 @@ public final void writeCompositeSnapshot(DataOutputView 
out) throws IOException
        /**
         * Reads the composite snapshot of all the contained serializers.
         */
-       public static CompositeSerializerSnapshot 
readCompositeSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+       public static NestedSerializersSnapshotDelegate 
readNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws 
IOException {
                final int magicNumber = in.readInt();
                if (magicNumber != MAGIC_NUMBER) {
                        throw new IOException(String.format("Corrupt data, 
magic number mismatch. Expected %8x, found %8x",
@@ -167,14 +180,14 @@ public static CompositeSerializerSnapshot 
readCompositeSnapshot(DataInputView in
                        nestedSnapshots[i] = 
TypeSerializerSnapshot.readVersionedSnapshot(in, cl);
                }
 
-               return new CompositeSerializerSnapshot(nestedSnapshots);
+               return new NestedSerializersSnapshotDelegate(nestedSnapshots);
        }
 
        /**
         * Reads the composite snapshot of all the contained serializers in a 
way that is compatible
         * with Version 1 of the deprecated {@link 
CompositeTypeSerializerConfigSnapshot}.
         */
-       public static CompositeSerializerSnapshot 
legacyReadProductSnapshots(DataInputView in, ClassLoader cl) throws IOException 
{
+       public static NestedSerializersSnapshotDelegate 
legacyReadNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws 
IOException {
                @SuppressWarnings("deprecation")
                final List<Tuple2<TypeSerializer<?>, 
TypeSerializerSnapshot<?>>> serializersAndSnapshots =
                                
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, cl);
@@ -183,7 +196,7 @@ public static CompositeSerializerSnapshot 
legacyReadProductSnapshots(DataInputVi
                                .map(t -> t.f1)
                                .toArray(TypeSerializerSnapshot<?>[]::new);
 
-               return new CompositeSerializerSnapshot(nestedSnapshots);
+               return new NestedSerializersSnapshotDelegate(nestedSnapshots);
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 377dd4cc198..762a4410c85 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -49,10 +49,13 @@ public CollectionSerializerConfigSnapshot(TypeSerializer<T> 
elementSerializer) {
        @Override
        public TypeSerializerSchemaCompatibility<C> 
resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
                if (newSerializer instanceof ListSerializer) {
-                       ListSerializerSnapshot<T> listSerializerSnapshot =
-                               new 
ListSerializerSnapshot<>(((ListSerializer<T>) 
newSerializer).getElementSerializer());
+                       ListSerializer<T> newListSerializer = 
(ListSerializer<T>) newSerializer;
+                       ListSerializerSnapshot<T> listSerializerSnapshot = new 
ListSerializerSnapshot<>(newListSerializer);
 
-                       return 
listSerializerSnapshot.resolveSchemaCompatibility((ListSerializer) 
newSerializer);
+                       @SuppressWarnings("unchecked")
+                       TypeSerializerSchemaCompatibility<C> result = 
(TypeSerializerSchemaCompatibility<C>)
+                               
listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
+                       return result;
                } else {
                        return super.resolveSchemaCompatibility(newSerializer);
                }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 55ba8ab477f..a4949fb6fa7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -206,7 +206,7 @@ public String toString() {
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
-               return new GenericArraySerializerConfigSnapshot<>(this);
+       public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
+               return new GenericArraySerializerSnapshot<>(this);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index b0aa24128d2..cfc2e9825d7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -38,8 +38,12 @@
  * Point-in-time configuration of a {@link GenericArraySerializer}.
  *
  * @param <C> The component type.
+ *
+ * @deprecated this is deprecated and no longer used by the {@link 
GenericArraySerializer}.
+ *             It has been replaced by {@link GenericArraySerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class GenericArraySerializerConfigSnapshot<C> implements 
TypeSerializerSnapshot<C[]> {
 
        private static final int CURRENT_VERSION = 2;
@@ -50,7 +54,7 @@
 
        /** Snapshot handling for the component serializer snapshot. */
        @Nullable
-       private CompositeSerializerSnapshot nestedSnapshot;
+       private NestedSerializersSnapshotDelegate nestedSnapshot;
 
        /**
         * Constructor for read instantiation.
@@ -63,7 +67,7 @@ public GenericArraySerializerConfigSnapshot() {}
         */
        public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> 
serializer) {
                this.componentClass = serializer.getComponentClass();
-               this.nestedSnapshot = new 
CompositeSerializerSnapshot(serializer.getComponentSerializer());
+               this.nestedSnapshot = new 
NestedSerializersSnapshotDelegate(serializer.getComponentSerializer());
        }
 
        // 
------------------------------------------------------------------------
@@ -77,7 +81,7 @@ public int getCurrentVersion() {
        public void writeSnapshot(DataOutputView out) throws IOException {
                checkState(componentClass != null && nestedSnapshot != null);
                out.writeUTF(componentClass.getName());
-               nestedSnapshot.writeCompositeSnapshot(out);
+               nestedSnapshot.writeNestedSerializerSnapshots(out);
        }
 
        @Override
@@ -95,7 +99,7 @@ public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader classLoa
        }
 
        private void readV1(DataInputView in, ClassLoader classLoader) throws 
IOException {
-               nestedSnapshot = 
CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+               nestedSnapshot = 
NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, 
classLoader);
 
                try (DataInputViewStream inViewWrapper = new 
DataInputViewStream(in)) {
                        componentClass = 
InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
@@ -107,29 +111,23 @@ private void readV1(DataInputView in, ClassLoader 
classLoader) throws IOExceptio
 
        private void readV2(DataInputView in, ClassLoader classLoader) throws 
IOException {
                componentClass = InstantiationUtil.resolveClassByName(in, 
classLoader);
-               nestedSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+               nestedSnapshot = 
NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, 
classLoader);
        }
 
        @Override
        public TypeSerializer<C[]> restoreSerializer() {
                checkState(componentClass != null && nestedSnapshot != null);
-               return new GenericArraySerializer<>(componentClass, 
nestedSnapshot.getRestoreSerializer(0));
+               return new GenericArraySerializer<>(componentClass, 
nestedSnapshot.getRestoredNestedSerializer(0));
        }
 
        @Override
        public TypeSerializerSchemaCompatibility<C[]> 
resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-               checkState(componentClass != null && nestedSnapshot != null);
-
                if (newSerializer instanceof GenericArraySerializer) {
-                       GenericArraySerializer<C> serializer = 
(GenericArraySerializer<C>) newSerializer;
-                       TypeSerializerSchemaCompatibility<C> compat = 
serializer.getComponentClass() == componentClass ?
-                                       
TypeSerializerSchemaCompatibility.compatibleAsIs() :
-                                       
TypeSerializerSchemaCompatibility.incompatible();
-
-                       return nestedSnapshot.resolveCompatibilityWithNested(
-                                       compat, 
serializer.getComponentSerializer());
-               }
-               else {
+                       // delegate to the new snapshot class
+                       GenericArraySerializer<C> castedNewSerializer = 
(GenericArraySerializer<C>) newSerializer;
+                       GenericArraySerializerSnapshot<C> newSnapshot = new 
GenericArraySerializerSnapshot<>(castedNewSerializer);
+                       return 
newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
+               } else {
                        return TypeSerializerSchemaCompatibility.incompatible();
                }
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
new file mode 100644
index 00000000000..3f54dee67cb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+public final class GenericArraySerializerSnapshot<C> extends 
CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+
+       private static final int CURRENT_VERSION = 1;
+
+       private Class<C> componentClass;
+
+       /**
+        * Constructor to be used for read instantiation.
+        */
+       public GenericArraySerializerSnapshot() {
+               super(GenericArraySerializer.class);
+       }
+
+       /**
+        * Constructor to be used for writing the snapshot.
+        */
+       public GenericArraySerializerSnapshot(GenericArraySerializer<C> 
genericArraySerializer) {
+               super(genericArraySerializer);
+               this.componentClass = 
genericArraySerializer.getComponentClass();
+       }
+
+       @Override
+       protected int getCurrentOuterSnapshotVersion() {
+               return CURRENT_VERSION;
+       }
+
+       @Override
+       protected void writeOuterSnapshot(DataOutputView out) throws 
IOException {
+               out.writeUTF(componentClass.getName());
+       }
+
+       @Override
+       protected void readOuterSnapshot(int readOuterSnapshotVersion, 
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+               this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
+       }
+
+       @Override
+       protected GenericArraySerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<C> componentSerializer = (TypeSerializer<C>) 
nestedSerializers[0];
+               return new GenericArraySerializer<>(componentClass, 
componentSerializer);
+       }
+
+       @Override
+       protected TypeSerializer<?>[] 
getNestedSerializers(GenericArraySerializer outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getComponentSerializer() };
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 08b33334653..51780319036 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -175,6 +175,6 @@ public int hashCode() {
 
        @Override
        public TypeSerializerSnapshot<List<T>> snapshotConfiguration() {
-               return new ListSerializerSnapshot<>(elementSerializer);
+               return new ListSerializerSnapshot<>(this);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index 5f89d94973f..f90e22a651a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -18,73 +18,45 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ListSerializer}.
  */
-public class ListSerializerSnapshot<T> implements 
TypeSerializerSnapshot<List<T>> {
+public class ListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<List<T>, ListSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public ListSerializerSnapshot() {}
+       public ListSerializerSnapshot() {
+               super(ListSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public ListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-               this.nestedElementSerializerSnapshot = new 
CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+       public ListSerializerSnapshot(ListSerializer<T> listSerializer) {
+               super(listSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<List<T>> restoreSerializer() {
-               return new 
ListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-       }
-
-       @Override
-       public TypeSerializerSchemaCompatibility<List<T>> 
resolveSchemaCompatibility(TypeSerializer<List<T>> newSerializer) {
-               checkState(nestedElementSerializerSnapshot != null);
-
-               if (newSerializer instanceof ListSerializer) {
-                       ListSerializer<T> serializer = (ListSerializer<T>) 
newSerializer;
-
-                       return 
nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getElementSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+       protected ListSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
+               return new ListSerializer<>(elementSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedElementSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(ListSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index dd3b81bade7..bedaf693608 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -202,6 +202,6 @@ public int hashCode() {
 
        @Override
        public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
-               return new MapSerializerSnapshot<>(keySerializer, 
valueSerializer);
+               return new MapSerializerSnapshot<>(this);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 000924f0245..2b78b527f7e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -50,8 +50,7 @@ public MapSerializerConfigSnapshot(TypeSerializer<K> 
keySerializer, TypeSerializ
                        // redirect the compatibility check to the new 
MapSerializerConfigSnapshot
                        MapSerializer<K, V> mapSerializer = (MapSerializer<K, 
V>) newSerializer;
 
-                       MapSerializerSnapshot<K, V> mapSerializerSnapshot =
-                               new 
MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), 
mapSerializer.getValueSerializer());
+                       MapSerializerSnapshot<K, V> mapSerializerSnapshot = new 
MapSerializerSnapshot<>(mapSerializer);
                        return 
mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
                }
                else {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index be2e4b0cbe6..a6db0ef74e6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -18,78 +18,50 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link MapSerializer}.
  */
-public class MapSerializerSnapshot<K, V> implements 
TypeSerializerSnapshot<Map<K, V>> {
+public class MapSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public MapSerializerSnapshot() {}
+       public MapSerializerSnapshot() {
+               super(MapSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public MapSerializerSnapshot(TypeSerializer<K> keySerializer, 
TypeSerializer<V> valueSerializer) {
-               Preconditions.checkNotNull(keySerializer);
-               Preconditions.checkNotNull(valueSerializer);
-               this.nestedKeyValueSerializerSnapshot = new 
CompositeSerializerSnapshot(keySerializer, valueSerializer);
+       public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
+               super(mapSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<Map<K, V>> restoreSerializer() {
-               return new MapSerializer<>(
-                       
nestedKeyValueSerializerSnapshot.getRestoreSerializer(0),
-                       
nestedKeyValueSerializerSnapshot.getRestoreSerializer(1));
-       }
+       protected MapSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<K> keySerializer = (TypeSerializer<K>) 
nestedSerializers[0];
 
-       @Override
-       public TypeSerializerSchemaCompatibility<Map<K, V>> 
resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
-               checkState(nestedKeyValueSerializerSnapshot != null);
+               @SuppressWarnings("unchecked")
+               TypeSerializer<V> valueSerializer = (TypeSerializer<V>) 
nestedSerializers[1];
 
-               if (newSerializer instanceof MapSerializer) {
-                       MapSerializer<K, V> serializer = (MapSerializer<K, V>) 
newSerializer;
-
-                       return 
nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getKeySerializer(),
-                               serializer.getValueSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out);
+               return new MapSerializer<>(keySerializer, valueSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedKeyValueSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(MapSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 3d4e8e92762..01286400694 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -205,7 +205,7 @@ public int hashCode() {
        // 
------------------------------------------------------------------------
 
        @Override
-       public EitherSerializerSnapshot<L, R> snapshotConfiguration() {
-               return new EitherSerializerSnapshot<>(leftSerializer, 
rightSerializer);
+       public JavaEitherSerializerSnapshot<L, R> snapshotConfiguration() {
+               return new JavaEitherSerializerSnapshot<>(this);
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 016fd0430f7..1779ec8e744 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -35,15 +35,19 @@
 
 /**
  * Configuration snapshot for the {@link EitherSerializer}.
+ *
+ * @deprecated this snapshot class is no longer used by any serializers.
+ *             Instead, {@link JavaEitherSerializerSnapshot} is used.
  */
 @Internal
+@Deprecated
 public final class EitherSerializerSnapshot<L, R> implements 
TypeSerializerSnapshot<Either<L, R>> {
 
        private static final int CURRENT_VERSION = 2;
 
        /** Snapshot handling for the component serializer snapshot. */
        @Nullable
-       private CompositeSerializerSnapshot nestedSnapshot;
+       private NestedSerializersSnapshotDelegate nestedSnapshot;
 
        /**
         * Constructor for read instantiation.
@@ -58,7 +62,7 @@ public EitherSerializerSnapshot(
                        TypeSerializer<L> leftSerializer,
                        TypeSerializer<R> rightSerializer) {
 
-               this.nestedSnapshot = new 
CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+               this.nestedSnapshot = new 
NestedSerializersSnapshotDelegate(leftSerializer, rightSerializer);
        }
 
        // 
------------------------------------------------------------------------
@@ -71,7 +75,7 @@ public int getCurrentVersion() {
        @Override
        public void writeSnapshot(DataOutputView out) throws IOException {
                checkState(nestedSnapshot != null);
-               nestedSnapshot.writeCompositeSnapshot(out);
+               nestedSnapshot.writeNestedSerializerSnapshots(out);
        }
 
        @Override
@@ -89,19 +93,19 @@ public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader classLoa
        }
 
        private void readV1(DataInputView in, ClassLoader classLoader) throws 
IOException {
-               nestedSnapshot = 
CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+               nestedSnapshot = 
NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, 
classLoader);
        }
 
        private void readV2(DataInputView in, ClassLoader classLoader) throws 
IOException {
-               nestedSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+               nestedSnapshot = 
NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, 
classLoader);
        }
 
        @Override
        public TypeSerializer<Either<L, R>> restoreSerializer() {
                checkState(nestedSnapshot != null);
                return new EitherSerializer<>(
-                               nestedSnapshot.getRestoreSerializer(0),
-                               nestedSnapshot.getRestoreSerializer(1));
+                               nestedSnapshot.getRestoredNestedSerializer(0),
+                               nestedSnapshot.getRestoredNestedSerializer(1));
        }
 
        @Override
@@ -110,12 +114,10 @@ private void readV2(DataInputView in, ClassLoader 
classLoader) throws IOExceptio
                checkState(nestedSnapshot != null);
 
                if (newSerializer instanceof EitherSerializer) {
+                       // delegate compatibility check to the new snapshot 
class
                        EitherSerializer<L, R> serializer = 
(EitherSerializer<L, R>) newSerializer;
-
-                       return nestedSnapshot.resolveCompatibilityWithNested(
-                                       
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                                       serializer.getLeftSerializer(),
-                                       serializer.getRightSerializer());
+                       JavaEitherSerializerSnapshot<L, R> newSnapshot = new 
JavaEitherSerializerSnapshot<>(serializer);
+                       return 
newSnapshot.resolveSchemaCompatibility(serializer);
                }
                else {
                        return TypeSerializerSchemaCompatibility.incompatible();
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
new file mode 100644
index 00000000000..503634599f8
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.Either;
+
+/**
+ * Snapshot class for the {@link EitherSerializer}.
+ */
+public class JavaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
+
+       private static final int CURRENT_VERSION = 1;
+
+       /**
+        * Constructor for read instantiation.
+        */
+       @SuppressWarnings("unused")
+       public JavaEitherSerializerSnapshot() {
+               super(EitherSerializer.class);
+       }
+
+       /**
+        * Constructor to create the snapshot for writing.
+        */
+       public JavaEitherSerializerSnapshot(EitherSerializer<L, R> 
eitherSerializer) {
+               super(eitherSerializer);
+       }
+
+       @Override
+       protected int getCurrentOuterSnapshotVersion() {
+               return CURRENT_VERSION;
+       }
+
+       @Override
+       protected EitherSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               return new EitherSerializer<>(nestedSerializers[0], 
nestedSerializers[1]);
+       }
+
+       @Override
+       protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[]{ 
outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c7b002a21ec..62135d7e550 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import 
org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
+import 
org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
-import org.apache.flink.api.java.typeutils.runtime.EitherSerializerSnapshot;
+import 
org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
 import org.apache.flink.types.Either;
 
 import org.junit.runner.RunWith;
@@ -48,14 +48,14 @@ public 
CompositeTypeSerializerSnapshotMigrationTest(TestSpecification<Object> te
 
                // Either<String, Integer>
 
-               final TestSpecification<Either<String, Integer>> either = 
TestSpecification.<Either<String, Integer>>builder("1.6-either", 
EitherSerializer.class, EitherSerializerSnapshot.class)
+               final TestSpecification<Either<String, Integer>> either = 
TestSpecification.<Either<String, Integer>>builder("1.6-either", 
EitherSerializer.class, JavaEitherSerializerSnapshot.class)
                        .withSerializerProvider(() -> new 
EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
                        
.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
                        .withTestData("flink-1.6-either-type-serializer-data", 
10);
 
                // GenericArray<String>
 
-               final TestSpecification<String[]> array = 
TestSpecification.<String[]>builder("1.6-generic-array", 
GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
+               final TestSpecification<String[]> array = 
TestSpecification.<String[]>builder("1.6-generic-array", 
GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
                        .withSerializerProvider(() -> new 
GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
                        
.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
                        .withTestData("flink-1.6-array-type-serializer-data", 
10);
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
new file mode 100644
index 00000000000..0f77e3d7376
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Test suite for the {@link CompositeTypeSerializerSnapshot}.
+ */
+public class CompositeTypeSerializerSnapshotTest {
+
+       // 
------------------------------------------------------------------------------------------------
+       //  Scope: tests 
CompositeTypeSerializerSnapshot#resolveSchemaCompatibility
+       // 
------------------------------------------------------------------------------------------------
+
+       @Test
+       public void testIncompatiblePrecedence() throws IOException {
+               final String OUTER_CONFIG = "outer-config";
+               final TypeSerializer<?>[] testNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+                       new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER)
+               };
+
+               TypeSerializerSchemaCompatibility<String> compatibility =
+                       
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                               testNestedSerializers,
+                               OUTER_CONFIG,
+                               OUTER_CONFIG);
+
+               Assert.assertTrue(compatibility.isIncompatible());
+       }
+
+       @Test
+       public void testCompatibleAfterMigrationPrecedence() throws IOException 
{
+               final String OUTER_CONFIG = "outer-config";
+               TypeSerializer<?>[] testNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+               };
+
+               TypeSerializerSchemaCompatibility<String> compatibility =
+                       
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                               testNestedSerializers,
+                               OUTER_CONFIG,
+                               OUTER_CONFIG);
+
+               Assert.assertTrue(compatibility.isCompatibleAfterMigration());
+       }
+
+       @Test
+       public void testCompatibleWithReconfiguredSerializerPrecedence() throws 
IOException {
+               final String OUTER_CONFIG = "outer-config";
+               TypeSerializer<?>[] testNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+               };
+
+               TypeSerializerSchemaCompatibility<String> compatibility =
+                       
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                               testNestedSerializers,
+                               OUTER_CONFIG,
+                               OUTER_CONFIG);
+
+               
Assert.assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
+
+               TestCompositeTypeSerializer reconfiguredSerializer =
+                       (TestCompositeTypeSerializer) 
compatibility.getReconfiguredSerializer();
+               TypeSerializer<?>[] reconfiguredNestedSerializers = 
reconfiguredSerializer.getNestedSerializers();
+               // nested serializer at index 1 should strictly be a 
ReconfiguredNestedSerializer
+               Assert.assertTrue(reconfiguredNestedSerializers[0].getClass() 
== NestedSerializer.class);
+               Assert.assertTrue(reconfiguredNestedSerializers[1].getClass() 
== ReconfiguredNestedSerializer.class);
+               Assert.assertTrue(reconfiguredNestedSerializers[2].getClass() 
== NestedSerializer.class);
+       }
+
+       @Test
+       public void testCompatibleAsIsPrecedence() throws IOException {
+               final String OUTER_CONFIG = "outer-config";
+               TypeSerializer<?>[] testNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+               };
+
+               TypeSerializerSchemaCompatibility<String> compatibility =
+                       
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                               testNestedSerializers,
+                               OUTER_CONFIG,
+                               OUTER_CONFIG);
+
+               Assert.assertTrue(compatibility.isCompatibleAsIs());
+       }
+
+       @Test
+       public void testOuterSnapshotCompatibilityPrecedence() throws 
IOException {
+               final String INIT_OUTER_CONFIG = "outer-config";
+               final String INCOMPAT_OUTER_CONFIG = "incompat-outer-config";
+               TypeSerializer<?>[] testNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+               };
+
+               TypeSerializerSchemaCompatibility<String> compatibility =
+                       
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                               testNestedSerializers,
+                               INIT_OUTER_CONFIG,
+                               INCOMPAT_OUTER_CONFIG);
+
+               // even though nested serializers are compatible, 
incompatibility of the outer
+               // snapshot should have higher precedence in the final result
+               Assert.assertTrue(compatibility.isIncompatible());
+       }
+
+       private TypeSerializerSchemaCompatibility<String> 
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                       TypeSerializer<?>[] testNestedSerializers,
+                       String initialOuterConfiguration,
+                       String newOuterConfiguration) throws IOException {
+               TestCompositeTypeSerializer testSerializer =
+                       new 
TestCompositeTypeSerializer(initialOuterConfiguration, testNestedSerializers);
+
+               TypeSerializerSnapshot<String> testSerializerSnapshot = 
testSerializer.snapshotConfiguration();
+
+               DataOutputSerializer out = new DataOutputSerializer(128);
+               TypeSerializerSnapshot.writeVersionedSnapshot(out, 
testSerializerSnapshot);
+
+               DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+               testSerializerSnapshot = 
TypeSerializerSnapshot.readVersionedSnapshot(
+                       in, Thread.currentThread().getContextClassLoader());
+
+               TestCompositeTypeSerializer newTestSerializer =
+                       new TestCompositeTypeSerializer(newOuterConfiguration, 
testNestedSerializers);
+               return 
testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
+       }
+
+       // 
------------------------------------------------------------------------------------------------
+       //  Scope: tests CompositeTypeSerializerSnapshot#restoreSerializer
+       // 
------------------------------------------------------------------------------------------------
+
+       @Test
+       public void testRestoreCompositeTypeSerializer() throws IOException {
+               // the target compatibilities of the nested serializers doesn't 
matter,
+               // because we're only testing the restore serializer
+               TypeSerializer<?>[] testNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION)
+               };
+
+               TestCompositeTypeSerializer testSerializer = new 
TestCompositeTypeSerializer("outer-config", testNestedSerializers);
+
+               TypeSerializerSnapshot<String> testSerializerSnapshot = 
testSerializer.snapshotConfiguration();
+
+               DataOutputSerializer out = new DataOutputSerializer(128);
+               TypeSerializerSnapshot.writeVersionedSnapshot(out, 
testSerializerSnapshot);
+
+               DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
+               testSerializerSnapshot = 
TypeSerializerSnapshot.readVersionedSnapshot(
+                       in, Thread.currentThread().getContextClassLoader());
+
+               // now, restore the composite type serializer;
+               // the restored nested serializer should be a 
RestoredNestedSerializer
+               testSerializer = (TestCompositeTypeSerializer) 
testSerializerSnapshot.restoreSerializer();
+               
Assert.assertTrue(testSerializer.getNestedSerializers()[0].getClass() == 
RestoredNestedSerializer.class);
+               
Assert.assertTrue(testSerializer.getNestedSerializers()[1].getClass() == 
RestoredNestedSerializer.class);
+               
Assert.assertTrue(testSerializer.getNestedSerializers()[2].getClass() == 
RestoredNestedSerializer.class);
+       }
+
+       // 
------------------------------------------------------------------------------------------------
+       //  Test utilities
+       // 
------------------------------------------------------------------------------------------------
+
+       /**
+        * A simple composite serializer used for testing.
+        * It can be configured with an array of nested serializers, as well as 
outer configuration (represented as String).
+        */
+       public static class TestCompositeTypeSerializer extends 
TypeSerializer<String> {
+
+               private static final long serialVersionUID = 
-545688468997398105L;
+
+               private static final StringSerializer delegateSerializer = 
StringSerializer.INSTANCE;
+
+               private final String outerConfiguration;
+
+               private final TypeSerializer<?>[] nestedSerializers;
+
+               TestCompositeTypeSerializer(
+                               String outerConfiguration,
+                               TypeSerializer<?>[] nestedSerializers) {
+                       this.outerConfiguration = outerConfiguration;
+                       this.nestedSerializers = nestedSerializers;
+               }
+
+               public String getOuterConfiguration() {
+                       return outerConfiguration;
+               }
+
+               TypeSerializer<?>[] getNestedSerializers() {
+                       return nestedSerializers;
+               }
+
+               @Override
+               public TypeSerializerSnapshot<String> snapshotConfiguration() {
+                       return new TestCompositeTypeSerializerSnapshot(this);
+               }
+
+               // 
--------------------------------------------------------------------------------
+               //  Serialization delegation
+               // 
--------------------------------------------------------------------------------
+
+               @Override
+               public String deserialize(String reuse, DataInputView source) 
throws IOException {
+                       return delegateSerializer.deserialize(reuse, source);
+               }
+
+               @Override
+               public String deserialize(DataInputView source) throws 
IOException {
+                       return delegateSerializer.deserialize(source);
+               }
+
+               @Override
+               public void serialize(String record, DataOutputView target) 
throws IOException {
+                       delegateSerializer.serialize(record, target);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       delegateSerializer.copy(source, target);
+               }
+
+               @Override
+               public String copy(String from) {
+                       return delegateSerializer.copy(from);
+               }
+
+               @Override
+               public String copy(String from, String reuse) {
+                       return delegateSerializer.copy(from, reuse);
+               }
+
+               @Override
+               public String createInstance() {
+                       return delegateSerializer.createInstance();
+               }
+
+               @Override
+               public TypeSerializer<String> duplicate() {
+                       return this;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public int getLength() {
+                       return 0;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (canEqual(obj)) {
+                               return Arrays.equals(nestedSerializers, 
((TestCompositeTypeSerializer) obj).getNestedSerializers());
+                       }
+                       return false;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Arrays.hashCode(nestedSerializers);
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof TestCompositeTypeSerializer;
+               }
+       }
+
+       /**
+        * Snapshot class for the {@link TestCompositeTypeSerializer}.
+        */
+       public static class TestCompositeTypeSerializerSnapshot extends 
CompositeTypeSerializerSnapshot<String, TestCompositeTypeSerializer> {
+
+               private String outerConfiguration;
+
+               public TestCompositeTypeSerializerSnapshot() {
+                       super(TestCompositeTypeSerializer.class);
+               }
+
+               TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer 
serializer) {
+                       super(serializer);
+                       this.outerConfiguration = 
serializer.getOuterConfiguration();
+               }
+
+               @Override
+               protected TestCompositeTypeSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+                       return new 
TestCompositeTypeSerializer(outerConfiguration, nestedSerializers);
+               }
+
+               @Override
+               protected TypeSerializer<?>[] 
getNestedSerializers(TestCompositeTypeSerializer outerSerializer) {
+                       return outerSerializer.getNestedSerializers();
+               }
+
+               @Override
+               protected void writeOuterSnapshot(DataOutputView out) throws 
IOException {
+                       out.writeUTF(outerConfiguration);
+               }
+
+               @Override
+               public void readOuterSnapshot(int readOuterSnapshotVersion, 
DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+                       Assert.assertEquals(getCurrentOuterSnapshotVersion(), 
readOuterSnapshotVersion);
+                       this.outerConfiguration = in.readUTF();
+               }
+
+               @Override
+               protected boolean 
isOuterSnapshotCompatible(TestCompositeTypeSerializer newSerializer) {
+                       return 
outerConfiguration.equals(newSerializer.getOuterConfiguration());
+               }
+
+               @Override
+               public int getCurrentOuterSnapshotVersion() {
+                       return 1;
+               }
+       }
+
+       public enum TargetCompatibility {
+               COMPATIBLE_AS_IS,
+               COMPATIBLE_AFTER_MIGRATION,
+               COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+               INCOMPATIBLE
+       }
+
+       /**
+        * Used as nested serializers in the test composite serializer.
+        * A nested serializer can be configured with a {@link 
TargetCompatibility},
+        * which indicates what the result of the schema compatibility check 
should be
+        * when a new instance of it is being checked for compatibility.
+        */
+       public static class NestedSerializer extends TypeSerializer<String> {
+
+               private static final long serialVersionUID = 
-6175000932620623446L;
+
+               private static final StringSerializer delegateSerializer = 
StringSerializer.INSTANCE;
+
+               private final TargetCompatibility targetCompatibility;
+
+               NestedSerializer(TargetCompatibility targetCompatibility) {
+                       this.targetCompatibility = targetCompatibility;
+               }
+
+               @Override
+               public TypeSerializerSnapshot<String> snapshotConfiguration() {
+                       return new 
NestedSerializerSnapshot(targetCompatibility);
+               }
+
+               // 
--------------------------------------------------------------------------------
+               //  Serialization delegation
+               // 
--------------------------------------------------------------------------------
+
+               @Override
+               public String deserialize(String reuse, DataInputView source) 
throws IOException {
+                       return delegateSerializer.deserialize(reuse, source);
+               }
+
+               @Override
+               public String deserialize(DataInputView source) throws 
IOException {
+                       return delegateSerializer.deserialize(source);
+               }
+
+               @Override
+               public void serialize(String record, DataOutputView target) 
throws IOException {
+                       delegateSerializer.serialize(record, target);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       delegateSerializer.copy(source, target);
+               }
+
+               @Override
+               public String copy(String from) {
+                       return delegateSerializer.copy(from);
+               }
+
+               @Override
+               public String copy(String from, String reuse) {
+                       return delegateSerializer.copy(from, reuse);
+               }
+
+               @Override
+               public String createInstance() {
+                       return delegateSerializer.createInstance();
+               }
+
+               @Override
+               public TypeSerializer<String> duplicate() {
+                       return this;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public int getLength() {
+                       return 0;
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (canEqual(obj)) {
+                               return targetCompatibility == 
((NestedSerializer) obj).targetCompatibility;
+                       }
+                       return false;
+               }
+
+               @Override
+               public int hashCode() {
+                       return targetCompatibility.hashCode();
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof NestedSerializer;
+               }
+       }
+
+       /**
+        * Snapshot of the {@link NestedSerializer}.
+        */
+       public static class NestedSerializerSnapshot implements 
TypeSerializerSnapshot<String> {
+
+               private TargetCompatibility targetCompatibility;
+
+               public NestedSerializerSnapshot() {}
+
+               public NestedSerializerSnapshot(TargetCompatibility 
targetCompatibility) {
+                       this.targetCompatibility = targetCompatibility;
+               }
+
+               @Override
+               public void writeSnapshot(DataOutputView out) throws 
IOException {
+                       out.writeInt(targetCompatibility.ordinal());
+               }
+
+               @Override
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+                       this.targetCompatibility = 
TargetCompatibility.values()[in.readInt()];
+               }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<String> 
resolveSchemaCompatibility(TypeSerializer<String> newSerializer) {
+                       // checks the exact class instead of using instanceof;
+                       // this ensures that we get a new serializer, and not a 
ReconfiguredNestedSerializer or RestoredNestedSerializer
+                       if (newSerializer.getClass() == NestedSerializer.class) 
{
+                               switch (targetCompatibility) {
+                                       case COMPATIBLE_AS_IS:
+                                               return 
TypeSerializerSchemaCompatibility.compatibleAsIs();
+                                       case COMPATIBLE_AFTER_MIGRATION:
+                                               return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+                                       case 
COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:
+                                               return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+                                                       new 
ReconfiguredNestedSerializer(targetCompatibility));
+                                       case INCOMPATIBLE:
+                                               return 
TypeSerializerSchemaCompatibility.incompatible();
+                                       default:
+                                               throw new 
IllegalStateException("Unexpected target compatibility.");
+                               }
+                       }
+
+                       throw new IllegalArgumentException("Expected the new 
serializer to be of class " + NestedSerializer.class);
+               }
+
+               @Override
+               public TypeSerializer<String> restoreSerializer() {
+                       return new 
RestoredNestedSerializer(targetCompatibility);
+               }
+
+               @Override
+               public int getCurrentVersion() {
+                       return 1;
+               }
+       }
+
+       /**
+        * A variant of the {@link NestedSerializer} used only when creating a 
reconfigured instance
+        * of the serializer. This is used in tests as a tag to identify that 
the correct serializer
+        * instances are being used.
+        */
+       static class ReconfiguredNestedSerializer extends NestedSerializer {
+
+               private static final long serialVersionUID = 
-1396401178636869659L;
+
+               public ReconfiguredNestedSerializer(TargetCompatibility 
targetCompatibility) {
+                       super(targetCompatibility);
+               }
+
+       }
+
+       /**
+        * A variant of the {@link NestedSerializer} used only when creating a 
restored instance
+        * of the serializer. This is used in tests as a tag to identify that 
the correct serializer
+        * instances are being used.
+        */
+       static class RestoredNestedSerializer extends NestedSerializer {
+
+               private static final long serialVersionUID = 
-1396401178636869659L;
+
+               public RestoredNestedSerializer(TargetCompatibility 
targetCompatibility) {
+                       super(targetCompatibility);
+               }
+
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 186f50482af..ea18309ac98 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -103,6 +103,8 @@ public void movingForward() throws IOException {
                TypeSerializer<ElementT> restoredSerializer = 
previousSnapshot.restoreSerializer();
 
                TypeSerializerSnapshot<ElementT> nextSnapshot = 
restoredSerializer.snapshotConfiguration();
+               assertThat(nextSnapshot, 
instanceOf(testSpecification.snapshotClass));
+
                TypeSerializerSnapshot<ElementT> nextSnapshotDeserialized = 
writeAndThenReadTheSnapshot(restoredSerializer, nextSnapshot);
 
                assertThat(nextSnapshotDeserialized, allOf(
@@ -247,6 +249,10 @@ private Path getSnapshotDataLocation() {
                        return resourcePath(this.snapshotDataLocation);
                }
 
+               public Class<? extends TypeSerializerSnapshot<T>> 
getSnapshotClass() {
+                       return snapshotClass;
+               }
+
                @Override
                public String toString() {
                        return String.format("%s , %s, %s", name, 
serializerType.getSimpleName(), snapshotClass.getSimpleName());
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index 7afbc508230..ae1452bf672 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -187,7 +187,7 @@ public boolean canEqual(Object obj) {
 
                @Override
                public TypeSerializerSnapshot<Lockable<E>> 
snapshotConfiguration() {
-                       return new 
LockableTypeSerializerSnapshot<>(elementSerializer);
+                       return new LockableTypeSerializerSnapshot<>(this);
                }
 
                /**
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 44a4670cc1d..13867aca0c3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -19,74 +19,46 @@
 package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A {@link TypeSerializerSnapshot} for the {@link 
Lockable.LockableTypeSerializer}.
  */
 @Internal
-public class LockableTypeSerializerSnapshot<E> implements 
TypeSerializerSnapshot<Lockable<E>> {
+public class LockableTypeSerializerSnapshot<E> extends 
CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public LockableTypeSerializerSnapshot() {}
+       public LockableTypeSerializerSnapshot() {
+               super(Lockable.LockableTypeSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public LockableTypeSerializerSnapshot(TypeSerializer<E> 
elementSerializer) {
-               this.nestedElementSerializerSnapshot = new 
CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+       public 
LockableTypeSerializerSnapshot(Lockable.LockableTypeSerializer<E> 
lockableTypeSerializer) {
+               super(lockableTypeSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<Lockable<E>> restoreSerializer() {
-               return new 
Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
+       protected Lockable.LockableTypeSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<E> elementSerializer = (TypeSerializer<E>) 
nestedSerializers[0];
+               return new Lockable.LockableTypeSerializer<>(elementSerializer);
        }
 
        @Override
-       public TypeSerializerSchemaCompatibility<Lockable<E>> 
resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
-               checkState(nestedElementSerializerSnapshot != null);
-
-               if (newSerializer instanceof Lockable.LockableTypeSerializer) {
-                       Lockable.LockableTypeSerializer<E> serializer = 
(Lockable.LockableTypeSerializer<E>) newSerializer;
-
-                       return 
nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getElementSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
+       protected TypeSerializer<?>[] 
getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
-       }
-
-       @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedElementSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
-       }
-
 }
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index cca84d28d17..90468acf8c3 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,76 +18,50 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.ListView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link ListViewSerializer}.
  *
  * @param <T> the type of the list elements.
  */
-public final class ListViewSerializerSnapshot<T> implements 
TypeSerializerSnapshot<ListView<T>> {
+public final class ListViewSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedListSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public ListViewSerializerSnapshot() {}
+       public ListViewSerializerSnapshot() {
+               super(ListViewSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public ListViewSerializerSnapshot(TypeSerializer<List<T>> 
listSerializer) {
-               this.nestedListSerializerSnapshot = new 
CompositeSerializerSnapshot(Preconditions.checkNotNull(listSerializer));
+       public ListViewSerializerSnapshot(ListViewSerializer<T> 
listViewSerializer) {
+               super(listViewSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<ListView<T>> restoreSerializer() {
-               return new 
ListViewSerializer<>(nestedListSerializerSnapshot.getRestoreSerializer(0));
-       }
-
-       @Override
-       public TypeSerializerSchemaCompatibility<ListView<T>> 
resolveSchemaCompatibility(TypeSerializer<ListView<T>> newSerializer) {
-               checkState(nestedListSerializerSnapshot != null);
-
-               if (newSerializer instanceof ListViewSerializer) {
-                       ListViewSerializer<T> serializer = 
(ListViewSerializer<T>) newSerializer;
-
-                       return 
nestedListSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getListSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedListSerializerSnapshot.writeCompositeSnapshot(out);
+       protected ListViewSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<List<T>> listSerializer = 
(TypeSerializer<List<T>>) nestedSerializers[0];
+               return new ListViewSerializer<>(listSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedListSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getListSerializer() };
        }
 }
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index f59fc0a3654..132f42f3320 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,78 +18,51 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.MapView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link MapViewSerializer}.
  *
  * @param <K> the key type of the map entries.
  * @param <V> the value type of the map entries.
  */
-public class MapViewSerializerSnapshot<K, V> implements 
TypeSerializerSnapshot<MapView<K, V>> {
+public class MapViewSerializerSnapshot<K, V> extends 
CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedMapSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public MapViewSerializerSnapshot() {}
+       public MapViewSerializerSnapshot() {
+               super(MapViewSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public MapViewSerializerSnapshot(TypeSerializer<Map<K, V>> 
mapSerializer) {
-               this.nestedMapSerializerSnapshot = new 
CompositeSerializerSnapshot(Preconditions.checkNotNull(mapSerializer));
+       public MapViewSerializerSnapshot(MapViewSerializer<K, V> 
mapViewSerializer) {
+               super(mapViewSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<MapView<K, V>> restoreSerializer() {
-               return new 
MapViewSerializer<>(nestedMapSerializerSnapshot.getRestoreSerializer(0));
-       }
-
-       @Override
-       public TypeSerializerSchemaCompatibility<MapView<K, V>> 
resolveSchemaCompatibility(
-                       TypeSerializer<MapView<K, V>> newSerializer) {
-               checkState(nestedMapSerializerSnapshot != null);
-
-               if (newSerializer instanceof MapViewSerializer) {
-                       MapViewSerializer<K, V> serializer = 
(MapViewSerializer<K, V>) newSerializer;
-
-                       return 
nestedMapSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getMapSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedMapSerializerSnapshot.writeCompositeSnapshot(out);
+       protected MapViewSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<Map<K, V>> mapSerializer = 
(TypeSerializer<Map<K, V>>) nestedSerializers[0];
+               return new MapViewSerializer<>(mapSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedMapSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getMapSerializer() };
        }
 }
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 246af6c0dab..2d48c3daf16 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -77,7 +77,7 @@ class ListViewSerializer[T](val listSerializer: 
TypeSerializer[java.util.List[T]
     
listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
 
   override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
-    new ListViewSerializerSnapshot[T](listSerializer)
+    new ListViewSerializerSnapshot[T](this)
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot[_]): 
CompatibilityResult[ListView[T]] = {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index 89cdf701749..e0067c57b10 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -78,7 +78,7 @@ class MapViewSerializer[K, V](val mapSerializer: 
TypeSerializer[java.util.Map[K,
     mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, 
_]].mapSerializer)
 
   override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
-    new MapViewSerializerSnapshot[K, V](mapSerializer)
+    new MapViewSerializerSnapshot[K, V](this)
 
   // copy and modified from MapSerializer.ensureCompatibility
   override def ensureCompatibility(configSnapshot: 
TypeSerializerConfigSnapshot[_])
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 6fa9f02d419..d442d0d3ec7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -147,7 +147,7 @@ public int hashCode() {
 
        @Override
        public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
-               return new ArrayListSerializerSnapshot<>(elementSerializer);
+               return new ArrayListSerializerSnapshot<>(this);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index 7fc8c51c8cf..dde8d1a1e60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -18,72 +18,46 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ArrayListSerializer}.
  */
-public class ArrayListSerializerSnapshot<T> implements 
TypeSerializerSnapshot<ArrayList<T>> {
+public class ArrayListSerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public ArrayListSerializerSnapshot() {}
+       public ArrayListSerializerSnapshot() {
+               super(ArrayListSerializer.class);
+       }
 
        /**
         * Constructor for creating the snapshot for writing.
         */
-       public ArrayListSerializerSnapshot(TypeSerializer<T> elementSerializer) 
{
-               this.nestedElementSerializerSnapshot = new 
CompositeSerializerSnapshot(elementSerializer);
+       public ArrayListSerializerSnapshot(ArrayListSerializer<T> 
arrayListSerializer) {
+               super(arrayListSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<ArrayList<T>> restoreSerializer() {
-               return new 
ArrayListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-       }
-
-       @Override
-       public TypeSerializerSchemaCompatibility<ArrayList<T>> 
resolveSchemaCompatibility(TypeSerializer<ArrayList<T>> newSerializer) {
-               checkState(nestedElementSerializerSnapshot != null);
-
-               if (newSerializer instanceof ArrayListSerializer) {
-                       ArrayListSerializer<T> serializer = 
(ArrayListSerializer<T>) newSerializer;
-
-                       return 
nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getElementSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+       protected ArrayListSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<T> elementSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
+               return new ArrayListSerializer<>(elementSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedElementSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getElementSerializer() };
        }
 }
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index b67e47b4eff..26cfef5a1a8 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -18,81 +18,51 @@
 
 package org.apache.flink.api.scala.typeutils;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 
 import scala.util.Either;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerSnapshot<L, R> implements 
TypeSerializerSnapshot<Either<L, R>> {
+public class ScalaEitherSerializerSnapshot<L, R> extends 
CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
 
        private static final int CURRENT_VERSION = 1;
 
-       private CompositeSerializerSnapshot nestedLeftRightSerializerSnapshot;
-
        /**
         * Constructor for read instantiation.
         */
-       public ScalaEitherSerializerSnapshot() {}
+       public ScalaEitherSerializerSnapshot() {
+               super(EitherSerializer.class);
+       }
 
        /**
         * Constructor to create the snapshot for writing.
         */
-       public ScalaEitherSerializerSnapshot(TypeSerializer<L> leftSerializer, 
TypeSerializer<R> rightSerializer) {
-               Preconditions.checkNotNull(leftSerializer);
-               Preconditions.checkNotNull(rightSerializer);
-               this.nestedLeftRightSerializerSnapshot = new 
CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+       public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> 
eitherSerializer) {
+               super(eitherSerializer);
        }
 
        @Override
-       public int getCurrentVersion() {
+       public int getCurrentOuterSnapshotVersion() {
                return CURRENT_VERSION;
        }
 
        @Override
-       public TypeSerializer<Either<L, R>> restoreSerializer() {
-               return new EitherSerializer<>(
-                       
nestedLeftRightSerializerSnapshot.getRestoreSerializer(0),
-                       
nestedLeftRightSerializerSnapshot.getRestoreSerializer(1));
-       }
-
-       @Override
-       public TypeSerializerSchemaCompatibility<Either<L, R>> 
resolveSchemaCompatibility(
-                       TypeSerializer<Either<L, R>> newSerializer) {
-               checkState(nestedLeftRightSerializerSnapshot != null);
+       protected EitherSerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+               @SuppressWarnings("unchecked")
+               TypeSerializer<L> leftSerializer = (TypeSerializer<L>) 
nestedSerializers[0];
 
-               if (newSerializer instanceof EitherSerializer) {
-                       EitherSerializer<L, R> serializer = 
(EitherSerializer<L, R>) newSerializer;
+               @SuppressWarnings("unchecked")
+               TypeSerializer<R> rightSerializer = (TypeSerializer<R>) 
nestedSerializers[1];
 
-                       return 
nestedLeftRightSerializerSnapshot.resolveCompatibilityWithNested(
-                               
TypeSerializerSchemaCompatibility.compatibleAsIs(),
-                               serializer.getLeftSerializer(),
-                               serializer.getRightSerializer());
-               }
-               else {
-                       return TypeSerializerSchemaCompatibility.incompatible();
-               }
-       }
-
-       @Override
-       public void writeSnapshot(DataOutputView out) throws IOException {
-               nestedLeftRightSerializerSnapshot.writeCompositeSnapshot(out);
+               return new EitherSerializer<>(leftSerializer, rightSerializer);
        }
 
        @Override
-       public void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               this.nestedLeftRightSerializerSnapshot = 
CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+       protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer 
outerSerializer) {
+               return new TypeSerializer<?>[] { 
outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
        }
 }
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 68432a6f1a5..0427bb35f2b 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -125,7 +125,7 @@ class EitherSerializer[A, B](
   // 
--------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): ScalaEitherSerializerSnapshot[A, B] = {
-    new ScalaEitherSerializerSnapshot[A, B](leftSerializer, rightSerializer)
+    new ScalaEitherSerializerSnapshot[A, B](this)
   }
 
   override def ensureCompatibility(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to