http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index b155005..953c4fa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -33,7 +33,6 @@ public final class FloatSerializer extends 
TypeSerializerSingleton<Float> {
        
        private static final Float ZERO = Float.valueOf(0);
 
-       
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class FloatSerializer extends 
TypeSerializerSingleton<Float> {
        public boolean canEqual(Object obj) {
                return obj instanceof FloatSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(FloatValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 3ccd88a..2542be9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -31,8 +31,7 @@ public class FloatValueSerializer extends 
TypeSerializerSingleton<FloatValue> {
        private static final long serialVersionUID = 1L;
        
        public static final FloatValueSerializer INSTANCE = new 
FloatValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public class FloatValueSerializer extends 
TypeSerializerSingleton<FloatValue> {
        public boolean canEqual(Object obj) {
                return obj instanceof FloatValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(FloatSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index f35d71b..fe61ab3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.lang.reflect.Array;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -31,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A serializer for arrays of objects.
  * 
- * @param <C> The component type
+ * @param <C> The component type.
  */
 @Internal
 public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
@@ -186,4 +188,36 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
        public String toString() {
                return "Serializer " + componentClass.getName() + "[]";
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public GenericArraySerializerConfigSnapshot snapshotConfiguration() {
+               return new 
GenericArraySerializerConfigSnapshot<>(componentClass, 
componentSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<C[]> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
GenericArraySerializerConfigSnapshot) {
+                       final GenericArraySerializerConfigSnapshot config = 
(GenericArraySerializerConfigSnapshot) configSnapshot;
+
+                       if (componentClass.equals(config.getComponentClass())) {
+                               CompatibilityResult<C> compatResult = 
componentSerializer.ensureCompatibility(
+                                       
config.getSingleNestedSerializerConfigSnapshot());
+
+                               if (!compatResult.requiresMigration()) {
+                                       return 
CompatibilityResult.requiresMigration(null);
+                               } else if 
(compatResult.getConvertDeserializer() != null) {
+                                       return 
CompatibilityResult.requiresMigration(
+                                               new GenericArraySerializer<>(
+                                                       componentClass,
+                                                       
compatResult.getConvertDeserializer()));
+                               }
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
new file mode 100644
index 0000000..e78eb6c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+@Internal
+public final class GenericArraySerializerConfigSnapshot<C> extends 
CompositeTypeSerializerConfigSnapshot {
+
+       private static final int VERSION = 1;
+
+       private Class<C> componentClass;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public GenericArraySerializerConfigSnapshot() {}
+
+       public GenericArraySerializerConfigSnapshot(
+                       Class<C> componentClass,
+                       TypeSerializerConfigSnapshot 
componentSerializerConfigSnapshot) {
+
+               super(componentSerializerConfigSnapshot);
+
+               this.componentClass = 
Preconditions.checkNotNull(componentClass);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
+
+               InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), componentClass);
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+
+               try {
+                       componentClass = 
InstantiationUtil.deserializeObject(new DataInputViewStream(in), 
getUserCodeClassLoader());
+               } catch (ClassNotFoundException e) {
+                       throw new IOException("Could not find requested element 
class in classpath.", e);
+               }
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+
+       public Class<C> getComponentClass() {
+               return componentClass;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return super.equals(obj)
+                       && (obj instanceof GenericArraySerializerConfigSnapshot)
+                       && 
(componentClass.equals(((GenericArraySerializerConfigSnapshot) 
obj).getComponentClass()));
+       }
+
+       @Override
+       public int hashCode() {
+               return super.hashCode() * 31 + componentClass.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index de6b326..e046e8e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -33,7 +33,6 @@ public final class IntSerializer extends 
TypeSerializerSingleton<Integer> {
        
        private static final Integer ZERO = Integer.valueOf(0);
 
-
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class IntSerializer extends 
TypeSerializerSingleton<Integer> {
        public boolean canEqual(Object obj) {
                return obj instanceof IntSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(IntValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index 1a90313..65f5e35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -31,8 +31,7 @@ public final class IntValueSerializer extends 
TypeSerializerSingleton<IntValue>
        private static final long serialVersionUID = 1L;
        
        public static final IntValueSerializer INSTANCE = new 
IntValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public final class IntValueSerializer extends 
TypeSerializerSingleton<IntValue>
        public boolean canEqual(Object obj) {
                return obj instanceof IntValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(IntSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 5d5e8f8..02d22de 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -19,7 +19,9 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -39,7 +41,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of element in the list.
  */
 @Internal
-public class ListSerializer<T> extends TypeSerializer<List<T>> {
+public final class ListSerializer<T> extends TypeSerializer<List<T>> {
 
        private static final long serialVersionUID = 1119562170939152304L;
 
@@ -166,4 +168,30 @@ public class ListSerializer<T> extends 
TypeSerializer<List<T>> {
        public int hashCode() {
                return elementSerializer.hashCode();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public CollectionSerializerConfigSnapshot snapshotConfiguration() {
+               return new 
CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<List<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
CollectionSerializerConfigSnapshot) {
+                       CompatibilityResult<T> compatResult = 
elementSerializer.ensureCompatibility(
+                               ((CollectionSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+                       if (!compatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (compatResult.getConvertDeserializer() != 
null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new 
ListSerializer<>(compatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 086a2fc..cbdc3db 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -33,7 +33,6 @@ public final class LongSerializer extends 
TypeSerializerSingleton<Long> {
        
        private static final Long ZERO = Long.valueOf(0);
 
-
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class LongSerializer extends 
TypeSerializerSingleton<Long> {
        public boolean canEqual(Object obj) {
                return obj instanceof LongSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(LongValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 5d94325..dcf3805 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -31,8 +31,7 @@ public final class LongValueSerializer extends 
TypeSerializerSingleton<LongValue
        private static final long serialVersionUID = 1L;
        
        public static final LongValueSerializer INSTANCE = new 
LongValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public final class LongValueSerializer extends 
TypeSerializerSingleton<LongValue
        public boolean canEqual(Object obj) {
                return obj instanceof LongValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(LongSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 5e1a3bf..50900e4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -38,7 +41,8 @@ import java.util.HashMap;
  * @param <K> The type of the keys in the map.
  * @param <V> The type of the values in the map.
  */
-public class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
+@Internal
+public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 
        private static final long serialVersionUID = -6885593032367050078L;
        
@@ -190,4 +194,37 @@ public class MapSerializer<K, V> extends 
TypeSerializer<Map<K, V>> {
        public int hashCode() {
                return keySerializer.hashCode() * 31 + 
valueSerializer.hashCode();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public MapSerializerConfigSnapshot snapshotConfiguration() {
+               return new MapSerializerConfigSnapshot(
+                               keySerializer.snapshotConfiguration(),
+                               valueSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<Map<K, V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof MapSerializerConfigSnapshot) {
+                       TypeSerializerConfigSnapshot[] 
keyValueSerializerConfigSnapshots =
+                               ((MapSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                       CompatibilityResult<K> keyCompatResult = 
keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
+                       CompatibilityResult<V> valueCompatResult = 
valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);
+
+                       if (!keyCompatResult.requiresMigration() && 
!valueCompatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (keyCompatResult.getConvertDeserializer() != 
null && valueCompatResult.getConvertDeserializer() != null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new MapSerializer<>(
+                                               
keyCompatResult.getConvertDeserializer(),
+                                               
valueCompatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
new file mode 100644
index 0000000..38e1254
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * Configuration snapshot for serializers of maps, containing the
+ * configuration snapshot of its key serializer and value serializer.
+ */
+@Internal
+public final class MapSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+       private static final int VERSION = 1;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public MapSerializerConfigSnapshot() {}
+
+       public MapSerializerConfigSnapshot(
+                       TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
+                       TypeSerializerConfigSnapshot 
valueSerializerConfigSnapshot) {
+
+               super(keySerializerConfigSnapshot, 
valueSerializerConfigSnapshot);
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index b94c0ac..322591d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -33,7 +33,6 @@ public final class ShortSerializer extends 
TypeSerializerSingleton<Short> {
        
        private static final Short ZERO = Short.valueOf((short)0);
 
-       
        @Override
        public boolean isImmutableType() {
                return true;
@@ -83,4 +82,10 @@ public final class ShortSerializer extends 
TypeSerializerSingleton<Short> {
        public boolean canEqual(Object obj) {
                return obj instanceof ShortSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(ShortValueSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index 51b234e..9e90927 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -31,8 +31,7 @@ public final class ShortValueSerializer extends 
TypeSerializerSingleton<ShortVal
        private static final long serialVersionUID = 1L;
        
        public static final ShortValueSerializer INSTANCE = new 
ShortValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -84,4 +83,10 @@ public final class ShortValueSerializer extends 
TypeSerializerSingleton<ShortVal
        public boolean canEqual(Object obj) {
                return obj instanceof ShortValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(ShortSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
index 9e1bbfd..6e30a5f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
@@ -102,4 +102,11 @@ public final class SqlDateSerializer extends 
TypeSerializerSingleton<Date> {
        public boolean canEqual(Object obj) {
                return obj instanceof SqlDateSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(DateSerializer.class.getCanonicalName())
+                       || 
identifier.equals(SqlTimeSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
index 544cf0f..3fa06c4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
@@ -101,4 +101,16 @@ public final class SqlTimeSerializer extends 
TypeSerializerSingleton<Time> {
        public boolean canEqual(Object obj) {
                return obj instanceof SqlTimeSerializer;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & reconfiguring
+       // 
--------------------------------------------------------------------------------------------
+
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(DateSerializer.class.getCanonicalName())
+                       || 
identifier.equals(SqlDateSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 034a8cc..f2cd54f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -83,4 +83,10 @@ public final class StringSerializer extends 
TypeSerializerSingleton<String> {
        public boolean canEqual(Object obj) {
                return obj instanceof StringSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                               || 
identifier.equals(StringValue.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 7f86b76..4e7ff97 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -33,8 +33,7 @@ public final class StringValueSerializer extends 
TypeSerializerSingleton<StringV
        private static final int HIGH_BIT = 0x1 << 7;
        
        public static final StringValueSerializer INSTANCE = new 
StringValueSerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;
@@ -109,4 +108,10 @@ public final class StringValueSerializer extends 
TypeSerializerSingleton<StringV
        public boolean canEqual(Object obj) {
                return obj instanceof StringValueSerializer;
        }
+
+       @Override
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return 
super.isCompatibleSerializationFormatIdentifier(identifier)
+                       || 
identifier.equals(StringSerializer.class.getCanonicalName());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index c091072..c5decc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -19,7 +19,10 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 
 @Internal
 public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
@@ -48,4 +51,33 @@ public abstract class TypeSerializerSingleton<T> extends 
TypeSerializer<T>{
                        return false;
                }
        }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               // type serializer singletons should always be parameter-less
+               return new 
ParameterlessTypeSerializerConfig(getSerializationFormatIdentifier());
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof ParameterlessTypeSerializerConfig
+                               && isCompatibleSerializationFormatIdentifier(
+                                               
((ParameterlessTypeSerializerConfig) 
configSnapshot).getSerializationFormatIdentifier())) {
+
+                       return CompatibilityResult.compatible();
+               } else {
+                       return CompatibilityResult.requiresMigration(null);
+               }
+       }
+
+       /**
+        * Subclasses can override this if they know that they are also 
compatible with identifiers of other formats.
+        */
+       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
+               return identifier.equals(getSerializationFormatIdentifier());
+       }
+
+       private String getSerializationFormatIdentifier() {
+               return getClass().getCanonicalName();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index 371200c..bf14f72 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public final class BooleanPrimitiveArraySerializer extends 
TypeSerializerSinglet
        private static final boolean[] EMPTY = new boolean[0];
 
        public static final BooleanPrimitiveArraySerializer INSTANCE = new 
BooleanPrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index 0f56149..dbad1df 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -36,7 +36,7 @@ public final class BytePrimitiveArraySerializer extends 
TypeSerializerSingleton<
        private static final byte[] EMPTY = new byte[0];
 
        public static final BytePrimitiveArraySerializer INSTANCE = new 
BytePrimitiveArraySerializer();
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index c3655e9..ae30109 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public final class CharPrimitiveArraySerializer extends 
TypeSerializerSingleton<
        private static final char[] EMPTY = new char[0];
 
        public static final CharPrimitiveArraySerializer INSTANCE = new 
CharPrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 2b07cfd..610a22b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public final class DoublePrimitiveArraySerializer extends 
TypeSerializerSingleto
        private static final double[] EMPTY = new double[0];
 
        public static final DoublePrimitiveArraySerializer INSTANCE = new 
DoublePrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 1fa4715..50b2b2e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public final class FloatPrimitiveArraySerializer extends 
TypeSerializerSingleton
        private static final float[] EMPTY = new float[0];
 
        public static final FloatPrimitiveArraySerializer INSTANCE = new 
FloatPrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index a5a3e56..f11b46e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public class IntPrimitiveArraySerializer extends 
TypeSerializerSingleton<int[]>{
        private static final int[] EMPTY = new int[0];
 
        public static final IntPrimitiveArraySerializer INSTANCE = new 
IntPrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 3ecae36..b0788c9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public final class LongPrimitiveArraySerializer extends 
TypeSerializerSingleton<
        private static final long[] EMPTY = new long[0];
 
        public static final LongPrimitiveArraySerializer INSTANCE = new 
LongPrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 7f542c3..107fe03 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -36,8 +36,7 @@ public final class ShortPrimitiveArraySerializer extends 
TypeSerializerSingleton
        private static final short[] EMPTY = new short[0];
 
        public static final ShortPrimitiveArraySerializer INSTANCE = new 
ShortPrimitiveArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index ccf369a..61a654c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -38,8 +38,7 @@ public final class StringArraySerializer extends 
TypeSerializerSingleton<String[
        private static final String[] EMPTY = new String[0];
        
        public static final StringArraySerializer INSTANCE = new 
StringArraySerializer();
-       
-       
+
        @Override
        public boolean isImmutableType() {
                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 8bcd157..c9eeb34 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -19,6 +19,9 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -26,13 +29,17 @@ import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.util.Utf8;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.apache.flink.util.Preconditions;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,6 +58,14 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        private final Class<T> type;
        
        private final Class<? extends T> typeToInstantiate;
+
+       /**
+        * Map of class tag (using classname as tag) to their Kryo registration.
+        *
+        * <p>This map serves as a preview of the final registration result of
+        * the Kryo instance, taking into account registration overwrites.
+        */
+       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
        
        private transient ReflectDatumWriter<T> writer;
        private transient ReflectDatumReader<T> reader;
@@ -73,6 +88,8 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
                this.typeToInstantiate = checkNotNull(typeToInstantiate);
                
                InstantiationUtil.checkForInstantiation(typeToInstantiate);
+
+               this.kryoRegistrations = buildKryoRegistrations(type);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -165,14 +182,9 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
                        
instantiatorStrategy.setFallbackInstantiatorStrategy(new 
StdInstantiatorStrategy());
                        kryo.setInstantiatorStrategy(instantiatorStrategy);
 
-                       // register Avro types.
-                       this.kryo.register(GenericData.Array.class, new 
Serializers.SpecificInstanceCollectionSerializerForArrayList());
-                       this.kryo.register(Utf8.class);
-                       this.kryo.register(GenericData.EnumSymbol.class);
-                       this.kryo.register(GenericData.Fixed.class);
-                       this.kryo.register(GenericData.StringType.class);
-                       this.kryo.setAsmEnabled(true);
-                       this.kryo.register(type);
+                       kryo.setAsmEnabled(true);
+
+                       KryoUtils.applyRegistrations(kryo, 
kryoRegistrations.values());
                }
        }
        
@@ -201,4 +213,120 @@ public final class AvroSerializer<T> extends 
TypeSerializer<T> {
        public boolean canEqual(Object obj) {
                return obj instanceof AvroSerializer;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new AvroSerializerConfigSnapshot<>(type, 
typeToInstantiate, kryoRegistrations);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+                       final AvroSerializerConfigSnapshot<T> config = 
(AvroSerializerConfigSnapshot<T>) configSnapshot;
+
+                       if (type.equals(config.getTypeClass()) && 
typeToInstantiate.equals(config.getTypeToInstantiate())) {
+                               // resolve Kryo registrations; currently, since 
the Kryo registrations in Avro
+                               // are fixed, there shouldn't be a problem with 
the resolution here.
+
+                               LinkedHashMap<String, KryoRegistration> 
oldRegistrations = config.getKryoRegistrations();
+                               oldRegistrations.putAll(kryoRegistrations);
+
+                               for (Map.Entry<String, KryoRegistration> 
reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
+                                       if 
(reconfiguredRegistrationEntry.getValue().isDummy()) {
+                                               return 
CompatibilityResult.requiresMigration(null);
+                                       }
+                               }
+
+                               this.kryoRegistrations = oldRegistrations;
+                               return CompatibilityResult.compatible();
+                       }
+               }
+
+               // ends up here if the preceding serializer is not
+               // the ValueSerializer, or serialized data type has changed
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       public static class AvroSerializerConfigSnapshot<T> extends 
KryoRegistrationSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               private Class<? extends T> typeToInstantiate;
+
+               public AvroSerializerConfigSnapshot() {}
+
+               public AvroSerializerConfigSnapshot(
+                               Class<T> baseType,
+                               Class<? extends T> typeToInstantiate,
+                               LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+
+                       super(baseType, kryoRegistrations);
+                       this.typeToInstantiate = 
Preconditions.checkNotNull(typeToInstantiate);
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       super.write(out);
+
+                       out.writeUTF(typeToInstantiate.getName());
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       super.read(in);
+
+                       String classname = in.readUTF();
+                       try {
+                               typeToInstantiate = (Class<? extends T>) 
Class.forName(classname, true, getUserCodeClassLoader());
+                       } catch (ClassNotFoundException e) {
+                               throw new IOException("Cannot find requested 
class " + classname + " in classpath.", e);
+                       }
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+
+               public Class<? extends T> getTypeToInstantiate() {
+                       return typeToInstantiate;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               in.defaultReadObject();
+
+               // kryoRegistrations may be null if this Avro serializer is 
deserialized from an old version
+               if (kryoRegistrations == null) {
+                       this.kryoRegistrations = buildKryoRegistrations(type);
+               }
+       }
+
+       private static <T> LinkedHashMap<String, KryoRegistration> 
buildKryoRegistrations(Class<T> serializedDataType) {
+               final LinkedHashMap<String, KryoRegistration> registrations = 
new LinkedHashMap<>();
+
+               // register Avro types.
+               registrations.put(
+                               GenericData.Array.class.getName(),
+                               new KryoRegistration(
+                                               GenericData.Array.class,
+                                               new 
ExecutionConfig.SerializableSerializer<>(new 
Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+               registrations.put(Utf8.class.getName(), new 
KryoRegistration(Utf8.class));
+               registrations.put(GenericData.EnumSymbol.class.getName(), new 
KryoRegistration(GenericData.EnumSymbol.class));
+               registrations.put(GenericData.Fixed.class.getName(), new 
KryoRegistration(GenericData.Fixed.class));
+               registrations.put(GenericData.StringType.class.getName(), new 
KryoRegistration(GenericData.StringType.class));
+
+               // register the serialized data type
+               registrations.put(serializedDataType.getName(), new 
KryoRegistration(serializedDataType));
+
+               return registrations;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 1f3fcbc..46b93c2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -21,7 +21,10 @@ package org.apache.flink.api.java.typeutils.runtime;
 import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.CopyableValue;
@@ -30,7 +33,7 @@ import org.apache.flink.util.InstantiationUtil;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
-public class CopyableValueSerializer<T extends CopyableValue<T>> extends 
TypeSerializer<T> {
+public final class CopyableValueSerializer<T extends CopyableValue<T>> extends 
TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
        
@@ -128,4 +131,41 @@ public class CopyableValueSerializer<T extends 
CopyableValue<T>> extends TypeSer
        public boolean canEqual(Object obj) {
                return obj instanceof CopyableValueSerializer;
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public CopyableValueSerializerConfigSnapshot<T> snapshotConfiguration() 
{
+               return new CopyableValueSerializerConfigSnapshot<>(valueClass);
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
CopyableValueSerializerConfigSnapshot
+                               && 
valueClass.equals(((CopyableValueSerializerConfigSnapshot) 
configSnapshot).getTypeClass())) {
+                       return CompatibilityResult.compatible();
+               } else {
+                       return CompatibilityResult.requiresMigration(null);
+               }
+       }
+
+       public static final class CopyableValueSerializerConfigSnapshot<T 
extends CopyableValue<T>>
+                       extends GenericTypeSerializerConfigSnapshot<T> {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public CopyableValueSerializerConfigSnapshot() {}
+
+               public CopyableValueSerializerConfigSnapshot(Class<T> 
copyableValueClass) {
+                       super(copyableValueClass);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index d9018da..c025d61 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -19,7 +19,9 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Either;
@@ -183,4 +185,39 @@ public class EitherSerializer<L, R> extends 
TypeSerializer<Either<L, R>> {
        public int hashCode() {
                return 17 * leftSerializer.hashCode() + 
rightSerializer.hashCode();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public EitherSerializerConfigSnapshot snapshotConfiguration() {
+               return new EitherSerializerConfigSnapshot(
+                               leftSerializer.snapshotConfiguration(),
+                               rightSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<Either<L, R>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof EitherSerializerConfigSnapshot) {
+                       TypeSerializerConfigSnapshot[] 
leftRightSerializerConfigSnapshots =
+                               ((EitherSerializerConfigSnapshot) 
configSnapshot).getNestedSerializerConfigSnapshots();
+
+                       CompatibilityResult<L> leftCompatResult = 
leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]);
+                       CompatibilityResult<R> rightCompatResult = 
rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]);
+
+                       if (!leftCompatResult.requiresMigration() && 
!rightCompatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else {
+                               if (leftCompatResult.getConvertDeserializer() 
!= null && rightCompatResult.getConvertDeserializer() != null) {
+                                       return 
CompatibilityResult.requiresMigration(
+                                               new EitherSerializer<>(
+                                                       
leftCompatResult.getConvertDeserializer(),
+                                                       
rightCompatResult.getConvertDeserializer()));
+                               }
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
new file mode 100644
index 0000000..473d438
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerConfigSnapshot.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.types.Either;
+
+/**
+ * Configuration snapshot for serializers of the {@link Either} type,
+ * containing configuration snapshots of the Left and Right serializers.
+ */
+@Internal
+public final class EitherSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+       private static final int VERSION = 1;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public EitherSerializerConfigSnapshot() {}
+
+       public EitherSerializerConfigSnapshot(
+                       TypeSerializerConfigSnapshot 
leftSerializerConfigSnapshot,
+                       TypeSerializerConfigSnapshot 
rightSerializerConfigSnapshot) {
+
+               super(leftSerializerConfigSnapshot, 
rightSerializerConfigSnapshot);
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
new file mode 100644
index 0000000..882073d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * A {@code KryoRegistration} resembles a registered class and its serializer 
in Kryo.
+ */
+@Internal
+public class KryoRegistration implements Serializable {
+
+       private static final long serialVersionUID = 5375110512910892655L;
+
+       /** IMPORTANT: the order of the enumerations must not change, since 
their ordinals are used for serialization. */
+       public enum SerializerDefinitionType {
+               UNSPECIFIED, CLASS, INSTANCE
+       }
+
+       /**
+        * The registered class.
+        *
+        * <p>This can be a dummy class {@link 
KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass} if
+        * the class no longer exists when this registration instance was 
restored.
+        */
+       private final Class<?> registeredClass;
+
+       /**
+        * Class of the serializer to use for the registered class.
+        * Exists only if the serializer definition type is {@link 
SerializerDefinitionType#CLASS}.
+        *
+        * <p>This can be a dummy serializer {@link 
KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if
+        * the serializer class no longer exists when this registration 
instance was restored.
+        */
+       private final Class<? extends Serializer<?>> serializerClass;
+
+       /**
+        * A serializable instance of the serializer to use for the registered 
class.
+        * Exists only if the serializer definition type is {@link 
SerializerDefinitionType#INSTANCE}.
+        *
+        * <p>This can be a dummy serializer {@link 
KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if
+        * the serializer class no longer exists or is no longer valid when 
this registration instance was restored.
+        */
+       private final ExecutionConfig.SerializableSerializer<? extends 
Serializer<?>> serializableSerializerInstance;
+
+       private SerializerDefinitionType serializerDefinitionType;
+
+       public KryoRegistration(Class<?> registeredClass) {
+               this.registeredClass = 
Preconditions.checkNotNull(registeredClass);
+
+               this.serializerClass = null;
+               this.serializableSerializerInstance = null;
+
+               this.serializerDefinitionType = 
SerializerDefinitionType.UNSPECIFIED;
+       }
+
+       public KryoRegistration(Class<?> registeredClass, Class<? extends 
Serializer<?>> serializerClass) {
+               this.registeredClass = 
Preconditions.checkNotNull(registeredClass);
+
+               this.serializerClass = 
Preconditions.checkNotNull(serializerClass);
+               this.serializableSerializerInstance = null;
+
+               this.serializerDefinitionType = SerializerDefinitionType.CLASS;
+       }
+
+       public KryoRegistration(
+                       Class<?> registeredClass,
+                       ExecutionConfig.SerializableSerializer<? extends 
Serializer<?>> serializableSerializerInstance) {
+               this.registeredClass = 
Preconditions.checkNotNull(registeredClass);
+
+               this.serializerClass = null;
+               this.serializableSerializerInstance = 
Preconditions.checkNotNull(serializableSerializerInstance);
+
+               this.serializerDefinitionType = 
SerializerDefinitionType.INSTANCE;
+       }
+
+       public Class<?> getRegisteredClass() {
+               return registeredClass;
+       }
+
+       public SerializerDefinitionType getSerializerDefinitionType() {
+               return serializerDefinitionType;
+       }
+
+       public Class<? extends Serializer<?>> getSerializerClass() {
+               return serializerClass;
+       }
+
+       public ExecutionConfig.SerializableSerializer<? extends Serializer<?>> 
getSerializableSerializerInstance() {
+               return serializableSerializerInstance;
+       }
+
+       public Serializer<?> getSerializer(Kryo kryo) {
+               switch (serializerDefinitionType) {
+                       case UNSPECIFIED:
+                               return null;
+                       case CLASS:
+                               return 
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, 
registeredClass);
+                       case INSTANCE:
+                               return 
serializableSerializerInstance.getSerializer();
+                       default:
+                               // this should not happen; adding as a guard 
for the future
+                               throw new IllegalStateException(
+                                               "Unrecognized Kryo registration 
serializer definition type: " + serializerDefinitionType);
+               }
+       }
+
+       public boolean isDummy() {
+               return 
registeredClass.equals(KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class)
+                               || (serializerClass != null
+                                               && 
serializerClass.equals(KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass.class))
+                               || (serializableSerializerInstance != null
+                                               && 
serializableSerializerInstance.getSerializer() instanceof 
KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               }
+
+               if (obj == null) {
+                       return false;
+               }
+
+               if (obj instanceof KryoRegistration) {
+                       KryoRegistration other = (KryoRegistration) obj;
+
+                       // we cannot include the serializer instances here 
because they don't implement the equals method
+                       return serializerDefinitionType == 
other.serializerDefinitionType
+                               && registeredClass == other.registeredClass
+                               && serializerClass == other.serializerClass;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               int result = serializerDefinitionType.hashCode();
+               result = 31 * result + registeredClass.hashCode();
+
+               if (serializerClass != null) {
+                       result = 31 * result + serializerClass.hashCode();
+               }
+
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
new file mode 100644
index 0000000..3a42d69
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Configuration snapshot base class for serializers that use Kryo for 
serialization.
+ *
+ * <p>The configuration captures the order of Kryo serializer registrations, 
so that new
+ * Kryo serializers can determine how to reconfigure their registration order 
to retain
+ * backwards compatibility.
+ *
+ * @param <T> the data type that the Kryo serializer handles.
+ */
+@Internal
+public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends 
GenericTypeSerializerConfigSnapshot<T> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KryoRegistrationSerializerConfigSnapshot.class);
+
+       /** Map of class tag to the registration, with ordering. */
+       private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
+       /** This empty nullary constructor is required for deserializing the 
configuration. */
+       public KryoRegistrationSerializerConfigSnapshot() {}
+
+       public KryoRegistrationSerializerConfigSnapshot(
+                       Class<T> typeClass,
+                       LinkedHashMap<String, KryoRegistration> 
kryoRegistrations) {
+
+               super(typeClass);
+
+               this.kryoRegistrations = 
Preconditions.checkNotNull(kryoRegistrations);
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               super.write(out);
+
+               out.writeInt(kryoRegistrations.size());
+               for (Map.Entry<String, KryoRegistration> kryoRegistrationEntry 
: kryoRegistrations.entrySet()) {
+                       out.writeUTF(kryoRegistrationEntry.getKey());
+                       new 
KryoRegistrationSerializationProxy<>(kryoRegistrationEntry.getValue()).write(out);
+               }
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               super.read(in);
+
+               int numKryoRegistrations = in.readInt();
+               kryoRegistrations = new LinkedHashMap<>(numKryoRegistrations);
+
+               KryoRegistrationSerializationProxy proxy;
+               for (int i = 0; i < numKryoRegistrations; i++) {
+                       String classTag = in.readUTF();
+
+                       proxy = new 
KryoRegistrationSerializationProxy(getUserCodeClassLoader());
+                       proxy.read(in);
+
+                       kryoRegistrations.put(classTag, proxy.kryoRegistration);
+               }
+       }
+
+       public LinkedHashMap<String, KryoRegistration> getKryoRegistrations() {
+               return kryoRegistrations;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class KryoRegistrationSerializationProxy<RC> implements 
IOReadableWritable {
+
+               private ClassLoader userCodeClassLoader;
+
+               private KryoRegistration kryoRegistration;
+
+               public KryoRegistrationSerializationProxy(ClassLoader 
userCodeClassLoader) {
+                       this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
+               }
+
+               public KryoRegistrationSerializationProxy(KryoRegistration 
kryoRegistration) {
+                       this.kryoRegistration = 
Preconditions.checkNotNull(kryoRegistration);
+               }
+
+               @Override
+               public void write(DataOutputView out) throws IOException {
+                       
out.writeUTF(kryoRegistration.getRegisteredClass().getName());
+
+                       final KryoRegistration.SerializerDefinitionType 
serializerDefinitionType = kryoRegistration.getSerializerDefinitionType();
+
+                       out.writeInt(serializerDefinitionType.ordinal());
+                       switch (serializerDefinitionType) {
+                               case UNSPECIFIED:
+                                       // nothing else to write
+                                       break;
+                               case CLASS:
+                                       
out.writeUTF(kryoRegistration.getSerializerClass().getName());
+                                       break;
+                               case INSTANCE:
+                                       InstantiationUtil.serializeObject(new 
DataOutputViewStream(out), 
kryoRegistration.getSerializableSerializerInstance());
+                                       break;
+                               default:
+                                       // this should not happen; adding as a 
guard for the future
+                                       throw new IllegalStateException(
+                                                       "Unrecognized Kryo 
registration serializer definition type: " + serializerDefinitionType);
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public void read(DataInputView in) throws IOException {
+                       String registeredClassname = in.readUTF();
+
+                       Class<RC> registeredClass;
+                       try {
+                               registeredClass = (Class<RC>) 
Class.forName(registeredClassname, true, userCodeClassLoader);
+                       } catch (ClassNotFoundException e) {
+                               LOG.warn("Cannot find registered class " + 
registeredClassname + " for Kryo serialization in classpath;" +
+                                       " using a dummy class as a 
placeholder.", e);
+
+                               registeredClass = (Class) 
DummyRegisteredClass.class;
+                       }
+
+                       final KryoRegistration.SerializerDefinitionType 
serializerDefinitionType =
+                               
KryoRegistration.SerializerDefinitionType.values()[in.readInt()];
+
+                       switch (serializerDefinitionType) {
+                               case UNSPECIFIED:
+                                       kryoRegistration = new 
KryoRegistration(registeredClass);
+                                       break;
+
+                               case CLASS:
+                                       String serializerClassname = 
in.readUTF();
+
+                                       Class serializerClass;
+                                       try {
+                                               serializerClass = 
Class.forName(serializerClassname, true, userCodeClassLoader);
+                                       } catch (ClassNotFoundException e) {
+                                               LOG.warn("Cannot find 
registered Kryo serializer class for class " + registeredClassname +
+                                                               " in classpath; 
using a dummy Kryo serializer that should be replaced as soon as" +
+                                                               " a new Kryo 
serializer for the class is present", e);
+
+                                               serializerClass = 
DummyKryoSerializerClass.class;
+                                       }
+
+                                       kryoRegistration = new 
KryoRegistration(registeredClass, serializerClass);
+                                       break;
+
+                               case INSTANCE:
+                                       
ExecutionConfig.SerializableSerializer<? extends Serializer<RC>> 
serializerInstance;
+                                       try {
+                                               serializerInstance = 
InstantiationUtil.deserializeObject(new DataInputViewStream(in), 
userCodeClassLoader);
+                                       } catch (ClassNotFoundException e) {
+                                               LOG.warn("Cannot find 
registered Kryo serializer class for class " + registeredClassname +
+                                                               " in classpath; 
using a dummy Kryo serializer that should be replaced as soon as" +
+                                                               " a new Kryo 
serializer for the class is present", e);
+
+                                               serializerInstance = new 
ExecutionConfig.SerializableSerializer<>(new DummyKryoSerializerClass<RC>());
+                                       } catch (InvalidClassException e) {
+                                               LOG.warn("The registered Kryo 
serializer class for class " + registeredClassname +
+                                                               " has changed 
and is no longer valid; using a dummy Kryo serializer that should be replaced" +
+                                                               " as soon as a 
new Kryo serializer for the class is present.", e);
+
+                                               serializerInstance = new 
ExecutionConfig.SerializableSerializer<>(new DummyKryoSerializerClass<RC>());
+                                       }
+
+                                       kryoRegistration = new 
KryoRegistration(registeredClass, serializerInstance);
+                                       break;
+
+                               default:
+                                       // this should not happen; adding as a 
guard for the future
+                                       throw new IllegalStateException(
+                                                       "Unrecognized Kryo 
registration serializer definition type: " + serializerDefinitionType);
+                       }
+               }
+       }
+
+       /**
+        * Placeholder dummy for a previously registered class that can no 
longer be found in classpath on restore.
+        */
+       public static class DummyRegisteredClass {}
+
+       /**
+        * Placeholder dummmy for a previously registered Kryo serializer that 
is no longer valid or in classpath on restore.
+        */
+       public static class DummyKryoSerializerClass<RC> extends Serializer<RC> 
implements Serializable {
+
+               private static final long serialVersionUID = 
-6172780797425739308L;
+
+               @Override
+               public void write(Kryo kryo, Output output, Object o) {
+                       throw new UnsupportedOperationException(
+                                       "This exception indicates that you're 
trying to write a data type" +
+                                               " that no longer has a valid 
Kryo serializer registered for it.");
+               }
+
+               @Override
+               public Object read(Kryo kryo, Input input, Class aClass) {
+                       throw new UnsupportedOperationException(
+                                       "This exception indicates that you're 
trying to read a data type" +
+                                               " that no longer has a valid 
Kryo serializer registered for it.");
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return super.equals(obj)
+                               && 
kryoRegistrations.equals(((KryoSerializer.KryoSerializerConfigSnapshot) 
obj).getKryoRegistrations());
+       }
+
+       @Override
+       public int hashCode() {
+               return super.hashCode() + kryoRegistrations.hashCode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
index 50c46e4..0937ac7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
@@ -20,11 +20,13 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
+import java.util.Collection;
 
 /**
  * Convenience methods for Kryo
@@ -86,4 +88,29 @@ public class KryoUtils {
                        }
                }
        }
+
+       /**
+        * Apply a list of {@link KryoRegistration} to a Kryo instance. The 
list of registrations is
+        * assumed to already be a final resolution of all possible 
registration overwrites.
+        *
+        * <p>The registrations are applied in the given order and always 
specify the registration id as
+        * the next available id in the Kryo instance (providing the id just 
extra ensures nothing is
+        * overwritten, and isn't strictly required);
+        *
+        * @param kryo the Kryo instance to apply the registrations
+        * @param resolvedRegistrations the registrations, which should already 
be resolved of all possible registration overwrites
+        */
+       public static void applyRegistrations(Kryo kryo, 
Collection<KryoRegistration> resolvedRegistrations) {
+
+               Serializer<?> serializer;
+               for (KryoRegistration registration : resolvedRegistrations) {
+                       serializer = registration.getSerializer(kryo);
+
+                       if (serializer != null) {
+                               
kryo.register(registration.getRegisteredClass(), serializer, 
kryo.getNextRegistrationId());
+                       } else {
+                               
kryo.register(registration.getRegisteredClass(), kryo.getNextRegistrationId());
+                       }
+               }
+       }
 }

Reply via email to