[ https://issues.apache.org/jira/browse/FLINK-4940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kostas Kloudas reassigned FLINK-4940: ------------------------------------- Assignee: Kostas Kloudas > Add support for broadcast state > ------------------------------- > > Key: FLINK-4940 > URL: https://issues.apache.org/jira/browse/FLINK-4940 > Project: Flink > Issue Type: Sub-task > Components: DataStream API > Reporter: Aljoscha Krettek > Assignee: Kostas Kloudas > Priority: Major > > As mentioned in > https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API > we need broadcast state to support job patterns where one (or several) > inputs are broadcast to all operator instances and where we keep state that > that is mutated only based on input from broadcast inputs. This special > restriction ensures that the broadcast state is the same on all parallel > operator instances when checkpointing (except when using at-least-once mode). > We therefore only have to checkpoint the state of one arbitrary instance, for > example instance 0. > For the different types of side inputs we need different types of state, > luckily, the side input types align with these state types we currently have > for keyed state: > - {{ValueState}} > - {{ListState}} > - {{MapState}} > We can therefore reuse keyed state backends for our purposes but need to put > a more restricting API in front of it: mutation of broadcast state must only > be allowed when actually processing broadcast input. If we don't have this > check users can (by mistake) modify broadcast state. This would lead to > incorrect results which are very hard to notice, much less debug. > With the way the Flink state API works (users can get a {{State}} in > {{open()}} and work with state by calling methods on that) we have to add > special wrapping state classes that only allow modification of state when > processing a broadcast element. > For the API, I propose to add a new interface `InternalStateAccessor`: > {code} > /** > * Interface for accessing persistent state. > */ > @PublicEvolving > public interface InternalStateAccessor { > <N, S extends State> S state( > N namespace, > TypeSerializer<N> namespaceSerializer, > StateDescriptor<S, ?> stateDescriptor)} > {code} > this is the same as `KeyedStateBackend.getPartitionedState()` but allows us > to abstract away the special nature of broadcast state. This is also meant as > an external interface and is not to be exposed to user functions. Only > operators should deal with this. > {{AbstractStreamOperator}} would get a new method > `getBroadcastStateAccessor()` that returns an implementation of this > interface. The implementation would have a {{KeyedStateBackend}} but wrap the > state in special wrappers that only allow modification when processing > broadcast elements (as mentioned above). > On the lower implementation levels, we have to add a new entry for our state > to `OperatorSnapshotResult`. For example: > {code} > private RunnableFuture<KeyGroupsStateHandle> broadcastStateManagedFuture; > {code} > Also the {{CheckpointCoordinator}} and savepoint/checkpoint serialisation > logic will have to be adapted to support this new kind of state. With the > ongoing changes in supporting incremental snapshotting and other new features > for `KeyedStateBackend` this should be coordinated with [~StephanEwen] and/or > [~stefanrichte...@gmail.com] and/or [~xiaogang.shi]. We also have to be very > careful about maintaining compatibility with savepoints from older versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)