Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this
https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a If I change Operators in this file to Operators2 i will not be able to recover from a savepoint that was made when this class still had its old name. The error in the flink ui will be java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) ... 6 more But the real reason is found in the task manager logs 2018-01-28 17:03:58,830 WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282) at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454) ... 17 more Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609) ... 22 more Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings.