http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7152bfc..96025fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -43,6 +43,8 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -71,7 +73,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -642,6 +643,139 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.value();
        }
 
+       @Test
+       public void testKryoRestoreResilienceWithDifferentRegistrationOrder() 
throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+
+               // register A first then B
+               
env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
+               
env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class);
+
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+
+               // make sure that we are in fact using the KryoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // ============== create snapshot of current configuration 
==============
+
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.update(new TestPojo("u1", 1, new 
TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
+
+               backend.setCurrentKey(2);
+               state.update(new TestPojo("u2", 2, new 
TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
+
+               KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+                       682375462378L,
+                       2,
+                       streamFactory,
+                       CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========== restore snapshot, with a different registration 
order in the configuration ==========
+
+               env = new DummyEnvironment("test", 1, 0);
+
+               
env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class); // this 
time register B first
+               
env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
+
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+
+               snapshot.discardState();
+
+               // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
+               // initializeSerializerUnlessSet would not pick up our new 
config
+               kvId = new ValueStateDescriptor<>("id", pojoType);
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+
+               // update to test state backends that eagerly serialize, such 
as RocksDB
+               state.update(new TestPojo("u1", 11, new 
TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
+
+               // this tests backends that lazily serialize, such as memory 
state backend
+               runSnapshot(backend.snapshot(
+                       682375462378L,
+                       2,
+                       streamFactory,
+                       CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+       }
+
+       @Test
+       public void testPojoRestoreResilienceWithDifferentRegistrationOrder() 
throws Exception {
+               CheckpointStreamFactory streamFactory = createStreamFactory();
+               Environment env = new DummyEnvironment("test", 1, 0);
+
+               // register A first then B
+               
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
+               
env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class);
+
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+
+               TypeInformation<TestPojo> pojoType = 
TypeExtractor.getForClass(TestPojo.class);
+
+               // make sure that we are in fact using the PojoSerializer
+               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof PojoSerializer);
+
+               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               // ============== create snapshot of current configuration 
==============
+
+               // make some more modifications
+               backend.setCurrentKey(1);
+               state.update(new TestPojo("u1", 1, new 
TestNestedPojoClassA(1.0, 2), new TestNestedPojoClassB(2.3, "foo")));
+
+               backend.setCurrentKey(2);
+               state.update(new TestPojo("u2", 2, new 
TestNestedPojoClassA(2.0, 5), new TestNestedPojoClassB(3.1, "bar")));
+
+               KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+                       682375462378L,
+                       2,
+                       streamFactory,
+                       CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+
+               // ========== restore snapshot, with a different registration 
order in the configuration ==========
+
+               env = new DummyEnvironment("test", 1, 0);
+
+               
env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this 
time register B first
+               
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
+
+               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+
+               snapshot.discardState();
+
+               // re-initialize to ensure that we create the PojoSerializer 
from scratch, otherwise
+               // initializeSerializerUnlessSet would not pick up our new 
config
+               kvId = new ValueStateDescriptor<>("id", pojoType);
+               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+               backend.setCurrentKey(1);
+
+               // update to test state backends that eagerly serialize, such 
as RocksDB
+               state.update(new TestPojo("u1", 11, new 
TestNestedPojoClassA(22.1, 12), new TestNestedPojoClassB(1.23, "foobar")));
+
+               // this tests backends that lazily serialize, such as memory 
state backend
+               runSnapshot(backend.snapshot(
+                       682375462378L,
+                       2,
+                       streamFactory,
+                       CheckpointOptions.forFullCheckpoint()));
+
+               backend.dispose();
+       }
 
        @Test
        @SuppressWarnings("unchecked")
@@ -1696,8 +1830,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                state.value();
 
                                fail("should recognize wrong serializers");
-                       } catch (IOException e) {
-                               if (!e.getMessage().contains("Trying to access 
state using wrong")) {
+                       } catch (RuntimeException e) {
+                               if (!e.getMessage().contains("State migration 
currently isn't supported")) {
                                        fail("wrong exception " + e);
                                }
                                // expected
@@ -1747,8 +1881,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                state.get();
 
                                fail("should recognize wrong serializers");
-                       } catch (IOException e) {
-                               if (!e.getMessage().contains("Trying to access 
state using wrong")) {
+                       } catch (RuntimeException e) {
+                               if (!e.getMessage().contains("State migration 
currently isn't supported")) {
                                        fail("wrong exception " + e);
                                }
                                // expected
@@ -1800,8 +1934,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                state.get();
 
                                fail("should recognize wrong serializers");
-                       } catch (IOException e) {
-                               if (!e.getMessage().contains("Trying to access 
state using wrong ")) {
+                       } catch (RuntimeException e) {
+                               if (!e.getMessage().contains("State migration 
currently isn't supported")) {
                                        fail("wrong exception " + e);
                                }
                                // expected
@@ -1851,8 +1985,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                state.entries();
 
                                fail("should recognize wrong serializers");
-                       } catch (IOException e) {
-                               if (!e.getMessage().contains("Trying to access 
state using wrong ")) {
+                       } catch (RuntimeException e) {
+                               if (!e.getMessage().contains("State migration 
currently isn't supported")) {
                                        fail("wrong exception " + e);
                                }
                                // expected
@@ -2382,15 +2516,27 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                return snapshotRunnableFuture.get();
        }
 
-       private static class TestPojo implements Serializable {
+       public static class TestPojo implements Serializable {
                private String strField;
                private Integer intField;
 
+               private TestNestedPojoClassA kryoClassAField;
+               private TestNestedPojoClassB kryoClassBField;
+
                public TestPojo() {}
 
                public TestPojo(String strField, Integer intField) {
                        this.strField = strField;
                        this.intField = intField;
+                       this.kryoClassAField = null;
+                       this.kryoClassBField = null;
+               }
+
+               public TestPojo(String strField, Integer intField, 
TestNestedPojoClassA classAField, TestNestedPojoClassB classBfield) {
+                       this.strField = strField;
+                       this.intField = intField;
+                       this.kryoClassAField = classAField;
+                       this.kryoClassBField = classBfield;
                }
 
                public String getStrField() {
@@ -2409,6 +2555,22 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        this.intField = intField;
                }
 
+               public TestNestedPojoClassA getKryoClassAField() {
+                       return kryoClassAField;
+               }
+
+               public void setKryoClassAField(TestNestedPojoClassA 
kryoClassAField) {
+                       this.kryoClassAField = kryoClassAField;
+               }
+
+               public TestNestedPojoClassB getKryoClassBField() {
+                       return kryoClassBField;
+               }
+
+               public void setKryoClassBField(TestNestedPojoClassB 
kryoClassBField) {
+                       this.kryoClassBField = kryoClassBField;
+               }
+
                @Override
                public String toString() {
                        return "TestPojo{" +
@@ -2424,14 +2586,133 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        TestPojo testPojo = (TestPojo) o;
 
-                       if (!strField.equals(testPojo.strField)) return false;
-                       return intField.equals(testPojo.intField);
+                       return strField.equals(testPojo.strField)
+                               && intField.equals(testPojo.intField)
+                               && ((kryoClassAField == null && 
testPojo.kryoClassAField == null) || 
kryoClassAField.equals(testPojo.kryoClassAField))
+                               && ((kryoClassBField == null && 
testPojo.kryoClassBField == null) || 
kryoClassBField.equals(testPojo.kryoClassBField));
                }
 
                @Override
                public int hashCode() {
                        int result = strField.hashCode();
                        result = 31 * result + intField.hashCode();
+
+                       if (kryoClassAField != null) {
+                               result = 31 * result + 
kryoClassAField.hashCode();
+                       }
+
+                       if (kryoClassBField != null) {
+                               result = 31 * result + 
kryoClassBField.hashCode();
+                       }
+
+                       return result;
+               }
+       }
+
+       public static class TestNestedPojoClassA implements Serializable {
+               private Double doubleField;
+               private Integer intField;
+
+               public TestNestedPojoClassA() {}
+
+               public TestNestedPojoClassA(Double doubleField, Integer 
intField) {
+                       this.doubleField = doubleField;
+                       this.intField = intField;
+               }
+
+               public Double getDoubleField() {
+                       return doubleField;
+               }
+
+               public void setDoubleField(Double doubleField) {
+                       this.doubleField = doubleField;
+               }
+
+               public Integer getIntField() {
+                       return intField;
+               }
+
+               public void setIntField(Integer intField) {
+                       this.intField = intField;
+               }
+
+               @Override
+               public String toString() {
+                       return "TestNestedPojoClassA{" +
+                               "doubleField='" + doubleField + '\'' +
+                               ", intField=" + intField +
+                               '}';
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) return true;
+                       if (o == null || getClass() != o.getClass()) return 
false;
+
+                       TestNestedPojoClassA testNestedPojoClassA = 
(TestNestedPojoClassA) o;
+
+                       if 
(!doubleField.equals(testNestedPojoClassA.doubleField)) return false;
+                       return intField.equals(testNestedPojoClassA.intField);
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = doubleField.hashCode();
+                       result = 31 * result + intField.hashCode();
+                       return result;
+               }
+       }
+
+       public static class TestNestedPojoClassB implements Serializable {
+               private Double doubleField;
+               private String strField;
+
+               public TestNestedPojoClassB() {}
+
+               public TestNestedPojoClassB(Double doubleField, String 
strField) {
+                       this.doubleField = doubleField;
+                       this.strField = strField;
+               }
+
+               public Double getDoubleField() {
+                       return doubleField;
+               }
+
+               public void setDoubleField(Double doubleField) {
+                       this.doubleField = doubleField;
+               }
+
+               public String getStrField() {
+                       return strField;
+               }
+
+               public void setStrField(String strField) {
+                       this.strField = strField;
+               }
+
+               @Override
+               public String toString() {
+                       return "TestNestedPojoClassB{" +
+                               "doubleField='" + doubleField + '\'' +
+                               ", strField=" + strField +
+                               '}';
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) return true;
+                       if (o == null || getClass() != o.getClass()) return 
false;
+
+                       TestNestedPojoClassB testNestedPojoClassB = 
(TestNestedPojoClassB) o;
+
+                       if 
(!doubleField.equals(testNestedPojoClassB.doubleField)) return false;
+                       return strField.equals(testNestedPojoClassB.strField);
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = doubleField.hashCode();
+                       result = 31 * result + strField.hashCode();
                        return result;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 976b9aa..8289821 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -29,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -50,8 +52,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
         */
        @Test
        public void testPutGetRemoveContainsTransform() throws Exception {
-               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
-                               new RegisteredBackendStateMetaInfo<>(
+               RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredKeyedBackendStateMetaInfo<>(
                                                StateDescriptor.Type.UNKNOWN,
                                                "test",
                                                IntSerializer.INSTANCE,
@@ -122,8 +124,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
         */
        @Test
        public void testIncrementalRehash() {
-               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
-                               new RegisteredBackendStateMetaInfo<>(
+               RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredKeyedBackendStateMetaInfo<>(
                                                StateDescriptor.Type.UNKNOWN,
                                                "test",
                                                IntSerializer.INSTANCE,
@@ -167,8 +169,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
        @Test
        public void testRandomModificationsAndCopyOnWriteIsolation() throws 
Exception {
 
-               final RegisteredBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
-                               new RegisteredBackendStateMetaInfo<>(
+               final RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredKeyedBackendStateMetaInfo<>(
                                                StateDescriptor.Type.UNKNOWN,
                                                "test",
                                                IntSerializer.INSTANCE,
@@ -322,8 +324,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
         */
        @Test
        public void testCopyOnWriteContracts() {
-               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
-                               new RegisteredBackendStateMetaInfo<>(
+               RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredKeyedBackendStateMetaInfo<>(
                                                StateDescriptor.Type.UNKNOWN,
                                                "test",
                                                IntSerializer.INSTANCE,
@@ -397,8 +399,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
                final TestDuplicateSerializer stateSerializer = new 
TestDuplicateSerializer();;
                final TestDuplicateSerializer keySerializer = new 
TestDuplicateSerializer();;
 
-               RegisteredBackendStateMetaInfo<Integer, Integer> metaInfo =
-                       new RegisteredBackendStateMetaInfo<>(
+               RegisteredKeyedBackendStateMetaInfo<Integer, Integer> metaInfo =
+                       new RegisteredKeyedBackendStateMetaInfo<>(
                                StateDescriptor.Type.VALUE,
                                "test",
                                namespaceSerializer,
@@ -649,5 +651,15 @@ public class CopyOnWriteStateTableTest extends TestLogger {
                public void disable() {
                        this.disabled = true;
                }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public CompatibilityResult<Integer> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       throw new UnsupportedOperationException();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
index 6fd94f7..85bc177 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,8 +44,8 @@ public class StateTableSnapshotCompatibilityTest {
        @Test
        public void checkCompatibleSerializationFormats() throws IOException {
                final Random r = new Random(42);
-               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
-                               new RegisteredBackendStateMetaInfo<>(
+               RegisteredKeyedBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredKeyedBackendStateMetaInfo<>(
                                                StateDescriptor.Type.UNKNOWN,
                                                "test",
                                                IntSerializer.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
index 146ccd0..ce23f30 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.testutils.recordutils;
 
 import java.io.IOException;
 
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Record;
@@ -141,4 +143,14 @@ public final class RecordSerializer extends 
TypeSerializer<Record> {
        public int hashCode() {
                return RecordSerializer.class.hashCode();
        }
+
+       @Override
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompatibilityResult<Record> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 6871159..468fddc 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -18,8 +18,9 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeSerializer, TypeSerializerConfigSnapshot}
+import 
org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 /**
  * Serializer for [[Either]].
@@ -104,4 +105,47 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   override def hashCode(): Int = {
     31 * leftSerializer.hashCode() + rightSerializer.hashCode()
   }
+
+  // 
--------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // 
--------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): EitherSerializerConfigSnapshot = {
+    new EitherSerializerConfigSnapshot(
+      leftSerializer.snapshotConfiguration(),
+      rightSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
+
+    configSnapshot match {
+      case eitherSerializerConfig: EitherSerializerConfigSnapshot =>
+        val leftRightConfigs =
+          eitherSerializerConfig.getNestedSerializerConfigSnapshots
+
+        val leftCompatResult = 
leftSerializer.ensureCompatibility(leftRightConfigs(0))
+        val rightCompatResult = 
rightSerializer.ensureCompatibility(leftRightConfigs(1))
+
+        if (leftCompatResult.requiresMigration || 
rightCompatResult.requiresMigration) {
+          if (leftCompatResult.getConvertDeserializer != null
+              && rightCompatResult.getConvertDeserializer != null) {
+
+            CompatibilityResult.requiresMigration(
+              new EitherSerializer[A, B, T](
+                leftCompatResult.getConvertDeserializer,
+                rightCompatResult.getConvertDeserializer
+              )
+            )
+
+          } else {
+            CompatibilityResult.requiresMigration(null)
+          }
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 67c1445..dc96c98 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -17,10 +17,14 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import java.io.IOException
+
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeSerializer, TypeSerializerConfigSnapshot}
 import org.apache.flink.api.common.typeutils.base.IntSerializer
+import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, 
DataOutputViewStream}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.util.InstantiationUtil
 
 /**
  * Serializer for [[Enumeration]] values.
@@ -67,4 +71,114 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) 
extends TypeSerializer[
   override def canEqual(obj: scala.Any): Boolean = {
     obj.isInstanceOf[EnumValueSerializer[_]]
   }
+
+  // 
--------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // 
--------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): 
EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = {
+    new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](
+      enum.getClass.asInstanceOf[Class[E]])
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[E#Value] = {
+
+    configSnapshot match {
+      case enumSerializerConfigSnapshot: 
EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
+        val enumClass = enum.getClass.asInstanceOf[Class[E]]
+        if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
+          val currentEnumConstants = 
enumSerializerConfigSnapshot.getEnumClass.getEnumConstants
+
+          for ( i <- 0 to currentEnumConstants.length) {
+            // compatible only if new enum constants are only appended,
+            // and original constants must be in the exact same order
+
+            if (currentEnumConstants(i) != 
enumSerializerConfigSnapshot.getEnumConstants(i)) {
+              CompatibilityResult.requiresMigration(null)
+            }
+          }
+
+          CompatibilityResult.compatible()
+        } else {
+          CompatibilityResult.requiresMigration(null)
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object EnumValueSerializer {
+
+  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration](private var 
enumClass: Class[E])
+      extends TypeSerializerConfigSnapshot {
+
+    var enumConstants: Array[E] = enumClass.getEnumConstants
+
+    /** This empty nullary constructor is required for deserializing the 
configuration. */
+    def this() = this(null)
+
+    override def write(out: DataOutputView): Unit = {
+      super.write(out)
+
+      try {
+        val outViewWrapper = new DataOutputViewStream(out)
+        try {
+          InstantiationUtil.serializeObject(outViewWrapper, enumClass)
+          InstantiationUtil.serializeObject(outViewWrapper, enumConstants)
+        } finally if (outViewWrapper != null) outViewWrapper.close()
+      }
+    }
+
+    override def read(in: DataInputView): Unit = {
+      super.read(in)
+
+      try {
+        val inViewWrapper = new DataInputViewStream(in)
+        try
+          try {
+            enumClass = InstantiationUtil.deserializeObject(
+              inViewWrapper, getUserCodeClassLoader)
+
+            enumConstants = InstantiationUtil.deserializeObject(
+              inViewWrapper, getUserCodeClassLoader)
+          } catch {
+            case e: ClassNotFoundException =>
+              throw new IOException("The requested enum class cannot be found 
in classpath.", e)
+          }
+          finally if (inViewWrapper != null) inViewWrapper.close()
+      }
+    }
+
+    override def getVersion: Int = ScalaEnumSerializerConfigSnapshot.VERSION
+
+    def getEnumClass: Class[E] = enumClass
+
+    def getEnumConstants: Array[E] = enumConstants
+
+    override def equals(obj: scala.Any): Boolean = {
+      if (obj == this) {
+        return true
+      }
+
+      if (obj == null) {
+        return false
+      }
+
+      obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] &&
+        
enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass)
 &&
+        enumConstants.sameElements(
+          obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumConstants)
+    }
+
+    override def hashCode(): Int = {
+      enumClass.hashCode() * 31 + enumConstants.toSeq.hashCode()
+    }
+  }
+
+  object ScalaEnumSerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index fa5279e..01ca295 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 /**
  * Serializer for cases where no serializer is required but the system still 
expects one. This
@@ -50,13 +50,19 @@ class NothingSerializer extends TypeSerializer[Any] {
   override def serialize(any: Any, target: DataOutputView): Unit =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
-
   override def deserialize(source: DataInputView): Any =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
   override def deserialize(reuse: Any, source: DataInputView): Any =
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Any] =
+    throw new RuntimeException("This must not be used. You encountered a bug.")
+
   override def equals(obj: Any): Boolean = {
     obj match {
       case nothingSerializer: NothingSerializer => 
nothingSerializer.canEqual(this)

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index a8b3a56..d2bb098 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 /**
  * Serializer for [[Option]].
@@ -95,4 +95,52 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
   override def hashCode(): Int = {
     elemSerializer.hashCode()
   }
+
+  // 
--------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // 
--------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): 
OptionSerializer.OptionSerializerConfigSnapshot = {
+    new 
OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[Option[A]] = {
+    configSnapshot match {
+      case optionSerializerConfigSnapshot: 
OptionSerializer.OptionSerializerConfigSnapshot =>
+        val compatResult = elemSerializer.ensureCompatibility(
+          
optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot)
+
+        if (compatResult.requiresMigration()) {
+          if (compatResult.getConvertDeserializer != null) {
+            CompatibilityResult.requiresMigration(
+              new OptionSerializer[A](compatResult.getConvertDeserializer))
+          } else {
+            CompatibilityResult.requiresMigration(null)
+          }
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object OptionSerializer {
+
+  class OptionSerializerConfigSnapshot(
+      private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot)
+    extends 
CompositeTypeSerializerConfigSnapshot(elemSerializerConfigSnapshot) {
+
+    /** This empty nullary constructor is required for deserializing the 
configuration. */
+    def this() = this(null)
+
+    override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
+  }
+
+  object OptionSerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index d1b9085..1ac46f9 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -20,8 +20,8 @@ package org.apache.flink.api.scala.typeutils
 import java.io.ObjectInputStream
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 import scala.collection.generic.CanBuildFrom
 
@@ -150,4 +150,13 @@ abstract class TraversableSerializer[T <: 
TraversableOnce[E], E](
   override def canEqual(obj: Any): Boolean = {
     obj.isInstanceOf[TraversableSerializer[_, _]]
   }
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+    throw new UnsupportedOperationException()
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
+    throw new UnsupportedOperationException()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index a5ec03a..c864dc7 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -19,11 +19,12 @@ package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
+import 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.KryoSerializerConfigSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
-import scala.util.{Success, Try, Failure}
+import scala.util.{Failure, Success, Try}
 
 /**
  * Serializer for [[scala.util.Try]].
@@ -98,4 +99,57 @@ class TrySerializer[A](
   override def hashCode(): Int = {
     31 * elemSerializer.hashCode() + executionConfig.hashCode()
   }
+
+  // 
--------------------------------------------------------------------------------------------
+  // Serializer configuration snapshotting & compatibility
+  // 
--------------------------------------------------------------------------------------------
+
+  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
+    new TrySerializer.TrySerializerConfigSnapshot(
+        elemSerializer.snapshotConfiguration(),
+        throwableSerializer.snapshotConfiguration())
+  }
+
+  override def ensureCompatibility(
+      configSnapshot: TypeSerializerConfigSnapshot): 
CompatibilityResult[Try[A]] = {
+
+    configSnapshot match {
+      case trySerializerConfigSnapshot: 
TrySerializer.TrySerializerConfigSnapshot =>
+        val serializerConfigSnapshots =
+          trySerializerConfigSnapshot.getNestedSerializerConfigSnapshots
+
+        val elemCompatRes =
+          elemSerializer.ensureCompatibility(serializerConfigSnapshots(0))
+        val throwableCompatRes =
+          throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1))
+
+        if (elemCompatRes.requiresMigration() || 
throwableCompatRes.requiresMigration()) {
+          CompatibilityResult.requiresMigration(null)
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration(null)
+    }
+  }
+}
+
+object TrySerializer {
+
+  class TrySerializerConfigSnapshot(
+      private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot,
+      private var throwableSerializerConfigSnapshot: 
KryoSerializerConfigSnapshot[Throwable])
+    extends CompositeTypeSerializerConfigSnapshot(
+      elemSerializerConfigSnapshot, throwableSerializerConfigSnapshot) {
+
+    /** This empty nullary constructor is required for deserializing the 
configuration. */
+    def this() = this(null, null)
+
+    override def getVersion: Int = TrySerializerConfigSnapshot.VERSION
+  }
+
+  object TrySerializerConfigSnapshot {
+    val VERSION = 1
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index fdcd5b8..53fea46 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -21,7 +21,11 @@ package 
org.apache.flink.migration.streaming.runtime.streamrecord;
 import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -205,6 +209,53 @@ public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<Stream
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public MultiplexingStreamRecordSerializerConfigSnapshot 
snapshotConfiguration() {
+               return new 
MultiplexingStreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<StreamElement> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
MultiplexingStreamRecordSerializerConfigSnapshot) {
+                       CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
+                               
((MultiplexingStreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+                       if (!compatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (compatResult.getConvertDeserializer() != 
null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new 
MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       /**
+        * Configuration snapshot specific to the {@link 
MultiplexingStreamRecordSerializer}.
+        */
+       public static final class 
MultiplexingStreamRecordSerializerConfigSnapshot
+                       extends CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public MultiplexingStreamRecordSerializerConfigSnapshot() {}
+
+               public 
MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
typeSerializerConfigSnapshot) {
+                       super(typeSerializerConfigSnapshot);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2c8dc4a..2a87f4e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -20,7 +20,10 @@ package 
org.apache.flink.migration.streaming.runtime.streamrecord;
 
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -144,4 +147,50 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
        public int hashCode() {
                return typeSerializer.hashCode();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public StreamRecordSerializerConfigSnapshot snapshotConfiguration() {
+               return new 
StreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<StreamRecord<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
StreamRecordSerializerConfigSnapshot) {
+                       CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
+                               ((StreamRecordSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+                       if (!compatResult.requiresMigration()) {
+                               return 
CompatibilityResult.requiresMigration(null);
+                       } else if (compatResult.getConvertDeserializer() != 
null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new 
StreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       /**
+        * Configuration snapshot specific to the {@link 
StreamRecordSerializer}.
+        */
+       public static final class StreamRecordSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public StreamRecordSerializerConfigSnapshot() {}
+
+               public 
StreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
typeSerializerConfigSnapshot) {
+                       super(typeSerializerConfigSnapshot);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 13a8a24..f0c3dc2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -29,7 +29,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -547,6 +549,16 @@ public class CoGroupedStreams<T1, T2> {
                public boolean canEqual(Object obj) {
                        return obj instanceof UnionSerializer;
                }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
+               }
+
+               @Override
+               public CompatibilityResult<TaggedUnion<T1, T2>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
index 7b6ba8d..1455712 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.operators;
 
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -184,5 +186,15 @@ public class InternalTimer<K, N> implements 
Comparable<InternalTimer<K, N>> {
                public int hashCode() {
                        return getClass().hashCode();
                }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
+               }
+
+               @Override
+               public CompatibilityResult<InternalTimer<K, N>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index b1e06f5..cf5c74c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing.windows;
 import java.io.IOException;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -61,7 +62,7 @@ public class GlobalWindow extends Window {
        /**
         * A {@link TypeSerializer} for {@link GlobalWindow}.
         */
-       public static class Serializer extends TypeSerializer<GlobalWindow> {
+       public static class Serializer extends 
TypeSerializerSingleton<GlobalWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -70,11 +71,6 @@ public class GlobalWindow extends Window {
                }
 
                @Override
-               public TypeSerializer<GlobalWindow> duplicate() {
-                       return this;
-               }
-
-               @Override
                public GlobalWindow createInstance() {
                        return GlobalWindow.INSTANCE;
                }
@@ -119,18 +115,8 @@ public class GlobalWindow extends Window {
                }
 
                @Override
-               public boolean equals(Object obj) {
-                       return obj instanceof Serializer;
-               }
-
-               @Override
                public boolean canEqual(Object obj) {
                        return obj instanceof Serializer;
                }
-
-               @Override
-               public int hashCode() {
-                       return 0;
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 6d896cb..a7ea244 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -130,7 +130,7 @@ public class TimeWindow extends Window {
        /**
         * The serializer used to write the TimeWindow type.
         */
-       public static class Serializer extends TypeSerializer<TimeWindow> {
+       public static class Serializer extends 
TypeSerializerSingleton<TimeWindow> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -139,11 +139,6 @@ public class TimeWindow extends Window {
                }
 
                @Override
-               public TypeSerializer<TimeWindow> duplicate() {
-                       return this;
-               }
-
-               @Override
                public TimeWindow createInstance() {
                        return null;
                }
@@ -188,19 +183,9 @@ public class TimeWindow extends Window {
                }
 
                @Override
-               public boolean equals(Object obj) {
-                       return obj instanceof Serializer;
-               }
-
-               @Override
                public boolean canEqual(Object obj) {
                        return obj instanceof Serializer;
                }
-
-               @Override
-               public int hashCode() {
-                       return 0;
-               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 7fe088a..5c52fa6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -22,7 +22,10 @@ import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -262,4 +265,54 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
        public int hashCode() {
                return typeSerializer.hashCode();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Serializer configuration snapshotting & compatibility
+       //
+       // This serializer may be used by Flink internal operators that need to 
checkpoint
+       // buffered records. Therefore, it may be part of managed state and 
need to implement
+       // the configuration snapshot and compatibility methods.
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public StreamElementSerializerConfigSnapshot snapshotConfiguration() {
+               return new 
StreamElementSerializerConfigSnapshot(typeSerializer.snapshotConfiguration());
+       }
+
+       @Override
+       public CompatibilityResult<StreamElement> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof 
StreamElementSerializerConfigSnapshot) {
+                       CompatibilityResult<T> compatResult = 
typeSerializer.ensureCompatibility(
+                               ((StreamElementSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerConfigSnapshot());
+
+                       if (!compatResult.requiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (compatResult.getConvertDeserializer() != 
null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new 
StreamElementSerializer<>(compatResult.getConvertDeserializer()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration(null);
+       }
+
+       /**
+        * Configuration snapshot specific to the {@link 
StreamElementSerializer}.
+        */
+       public static final class StreamElementSerializerConfigSnapshot extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               public StreamElementSerializerConfigSnapshot() {}
+
+               public 
StreamElementSerializerConfigSnapshot(TypeSerializerConfigSnapshot 
typeSerializerConfigSnapshot) {
+                       super(typeSerializerConfigSnapshot);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index a24a3a8..2693bc1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -228,6 +228,5 @@ public class CheckpointingCustomKvStateProgram {
                public boolean canEqual(Object obj) {
                        return obj instanceof CustomIntSerializer;
                }
-
        }
 }

Reply via email to