Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r163840020 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java --- @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception { } } + private <K, V> BroadcastState<K, V> getBroadcastState( + final MapStateDescriptor<K, V> stateDescriptor, + final OperatorStateHandle.Mode mode) throws StateMigrationException { + + Preconditions.checkNotNull(stateDescriptor); + String name = Preconditions.checkNotNull(stateDescriptor.getName()); + + @SuppressWarnings("unchecked") + BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name); + if (previous != null) { + checkStateNameAndMode( + previous.getStateMetaInfo().getName(), + name, + previous.getStateMetaInfo().getAssignmentMode(), + mode); + return previous; + } + + stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); + TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer()); + TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer()); + + BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name); + + if (broadcastState == null) { + broadcastState = new HeapBroadcastState<>( + new RegisteredBroadcastBackendStateMetaInfo<>( + name, + mode, + broadcastStateKeySerializer, + broadcastStateValueSerializer)); + registeredBroadcastStates.put(name, broadcastState); + } else { --- End diff -- Does this to the compatibility-check dance every time the state is accessed? Might be a bit to much and we could do it only the first time a state is accessed after restore.
---