Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163934364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) throws IOException { } } + public static class BroadcastStateMetaInfoWriterV2<K, V> extends AbstractBroadcastStateMetaInfoWriter<K, V> { + + public BroadcastStateMetaInfoWriterV2( + final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) { + super(broadcastStateMetaInfo); + } + + @Override + public void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException { + out.writeUTF(broadcastStateMetaInfo.getName()); + out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList(new Tuple2<>( + broadcastStateMetaInfo.getKeySerializer(), + broadcastStateMetaInfo.getKeySerializerConfigSnapshot()))); + + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( --- End diff -- Combining these two `writeSerializersAndConfigsWithResilience` calls into one call, with a single list containing both the key serializer and value serializer, would be more space-efficient in the written data: ``` TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, Arrays.asList( Tuple2.of(keySerializer, keySerializerConfig), Tuple2.of(valueSerializer, valueSerializerConfig)); ```
---