[ https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339259#comment-16339259 ]
ASF GitHub Bot commented on FLINK-8345: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163849551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -601,21 +805,43 @@ public void addAll(List<S> values) throws Exception { } } + private static <K, V> void deserializeBroadcastStateValues( + final BackendWritableBroadcastState<K, V> broadcastStateForName, + final FSDataInputStream in, + final OperatorStateHandle.StateMetaInfo metaInfo) throws Exception { + + if (metaInfo != null) { + long[] offsets = metaInfo.getOffsets(); + if (offsets != null) { + DataInputView div = new DataInputViewStreamWrapper(in); + + TypeSerializer<K> keySerializer = broadcastStateForName.getStateMetaInfo().getKeySerializer(); + TypeSerializer<V> valueSerializer = broadcastStateForName.getStateMetaInfo().getValueSerializer(); + + for (long offset : offsets) { + in.seek(offset); --- End diff -- As mentioned in `BackendWritableBroadcastState`, we don't need the offsets, then we also don't need to to all the seeking here. > Iterate over keyed state on broadcast side of connect with broadcast. > --------------------------------------------------------------------- > > Key: FLINK-8345 > URL: https://issues.apache.org/jira/browse/FLINK-8345 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)