Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163935632 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -211,4 +297,34 @@ public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) { return stateMetaInfo; } } + + public static class BroadcastStateMetaInfoReaderV2<K, V> extends AbstractBroadcastStateMetaInfoReader<K, V> { + + public BroadcastStateMetaInfoReaderV2(final ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @Override + public RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> readBroadcastStateMetaInfo(final DataInputView in) throws IOException { + RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> stateMetaInfo = + new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(); + + stateMetaInfo.setName(in.readUTF()); + stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); + + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0); --- End diff -- If the `writeSerializersAndConfigsWithResilience` call was a single one in the writer, then here you can also just get all written serializers and configs with a single `readSerializersAndConfigsWithResilience`. The returned list would be length 2 (order of the key / value serializer + config will be the same as how you wrote them).
---