[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339699#comment-16339699 ]
ASF GitHub Bot commented on FLINK-8345: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163934364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java --- @@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) throws IOException { } } + public static class BroadcastStateMetaInfoWriterV2<K, V> extends AbstractBroadcastStateMetaInfoWriter<K, V> { + + public BroadcastStateMetaInfoWriterV2( + final RegisteredBroadcastBackendStateMetaInfo.Snapshot<K, V> broadcastStateMetaInfo) { + super(broadcastStateMetaInfo); + } + + @Override + public void writeBroadcastStateMetaInfo(final DataOutputView out) throws IOException { + out.writeUTF(broadcastStateMetaInfo.getName()); + out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal()); + + // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList(new Tuple2<>( + broadcastStateMetaInfo.getKeySerializer(), + broadcastStateMetaInfo.getKeySerializerConfigSnapshot()))); + + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( --- End diff -- Combining these two `writeSerializersAndConfigsWithResilience` calls into one call, with a single list containing both the key serializer and value serializer, would be more space-efficient in the written data: ``` TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, Arrays.asList( Tuple2.of(keySerializer, keySerializerConfig), Tuple2.of(valueSerializer, valueSerializerConfig)); ``` > Iterate over keyed state on broadcast side of connect with broadcast. > --------------------------------------------------------------------- > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)