Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163870666 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception { } } + private <K, V> BroadcastState<K, V> getBroadcastState( + final MapStateDescriptor<K, V> stateDescriptor, + final OperatorStateHandle.Mode mode) throws StateMigrationException { + + Preconditions.checkNotNull(stateDescriptor); + String name = Preconditions.checkNotNull(stateDescriptor.getName()); + + @SuppressWarnings("unchecked") + BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name); + if (previous != null) { + checkStateNameAndMode( + previous.getStateMetaInfo().getName(), + name, + previous.getStateMetaInfo().getAssignmentMode(), + mode); + return previous; + } + + stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); + TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer()); + TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer()); + + BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name); + + if (broadcastState == null) { + broadcastState = new HeapBroadcastState<>( + new RegisteredBroadcastBackendStateMetaInfo<>( + name, + mode, + broadcastStateKeySerializer, + broadcastStateValueSerializer)); + registeredBroadcastStates.put(name, broadcastState); + } else { --- End diff -- No, because we have the `accessedBroadcastStatesByName.get(name)` above (line 641). As soon as we create or restore the broadcast state, we put it there (line 708). The next time we will try to access it, we will hit the cache (`accessedBroadcastStatesByName`) so we will not go through the creation/check phase.
---