Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r165388777 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java --- @@ -77,16 +90,33 @@ public void read(DataInputView in) throws IOException { super.read(in); int numKvStates = in.readShort(); - stateMetaInfoSnapshots = new ArrayList<>(numKvStates); + operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates); for (int i = 0; i < numKvStates; i++) { - stateMetaInfoSnapshots.add( - OperatorBackendStateMetaInfoSnapshotReaderWriters - .getReaderForVersion(getReadVersion(), userCodeClassLoader) - .readStateMetaInfo(in)); + operatorStateMetaInfoSnapshots.add( + OperatorBackendStateMetaInfoSnapshotReaderWriters + .getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader) + .readOperatorStateMetaInfo(in)); } + + if (VERSION >= 3) { --- End diff -- Wouldn't this always depend on the version of the code and not the version of the snapshot? That is, if we restore from a `VERSION < 3` snapshot we should not go into this code path. I think here you can get that via `getReadVersion()`.
---