[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298371#comment-15298371 ] ASF GitHub Bot commented on FLINK-3761: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-221312885 Yes definitely: better be safe than sorry. Will remove the `createPartitionedStateBackend` method from `AbstractStateBackend`. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298360#comment-15298360 ] ASF GitHub Bot commented on FLINK-3761: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-221311349 Jip, this last paragraph with the Factory is what I hinted at with my comment. This may be somewhat academic but if there is a method `getPartitionedStateBackend` the likelihood of it being wrongly used is somewhat high. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298355#comment-15298355 ] ASF GitHub Bot commented on FLINK-3761: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-221309557 Thanks for the initial feedback @aljoscha :-) The introduction of `PartitionedState` is indeed not strictly necessary for this PR. The idea was that we will have partitioned and non-partitioned state in the future. `PartitionedState` is the key-value state backed by the `PartitionedStateBackend` whereas non-partitioned state is backed by the `AbstractStateBackend`. The first non-partitioned state (apart from the state serialized via `CheckpointStateOutputStream`) could be the redistributable non-partitioned state necessary for the `KafkaSources`, for example. Thus, the `PartitionedState` is more of a logical separation and it lays the foundation so that also non-keyed stream operators can use a proper state abstraction. But I can revert it, if you deem it redundant or pre-mature. It is true that the `PartitionedStateBackend` and the `KeyGroupStateBackend` have **almost** the same signature. However, the changes you've mentioned are imho crucial and made the whole refactoring of the state backends necessary in the first place. The difference is that the `KeyGroupStateBackend` is aware of the key groups and, consequently, is able to snapshot and restore each key group individually. Trying to work around this would mean that the `PartitionedStateBackend` always has a single key group associated. But for that, it would have to know the sub task index of the enclosing `StreamOperator` to assign a sensible key group index. Furthermore, it wouldn't make sense to use any other `PartitionedStateBackend` than the `KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the `AbstractStreamOperator`, because the data is shuffled according to the key group assignments. In general, I think the notion of key groups are touching too many parts of the Flink runtime so that it makes no longer sense to try to unify the `KeyGroupStateBackends` and `PartitionedStateBackends`. The state backends used by the `AbstractStreamOperator` have to be aware of that notion. You can regard the `PartitionedStateBackend` as an internal class which was introduced to reuse the existing state backend implementations via the `GenericKeyGroupStateBackend`. In the future it might make sense to directly implement the `KeyGroupStateBackend` interface to decrease the key group overhead. It's just unfortunate that Java does not allow to specify package private methods. Otherwise, I would have declared the `createPartitionedStateBackend` as package private. But since the `GenericKeyGroupStateBackend` resides in a sub-package of `o.a.f.runtime.state`, it cannot access this method. But I think we could refactor it the following way: Remove `createPartitionedStateBackend`, make `createKeyGroupStateBackend` abstract, let the implementations of `AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface and define the `createKeyGroupStateBackend` method for all `AbstractStateBackend` implementations with creating a `GenericKeyGroupStateBackend` which requires a `PartitionedStateBackendFactory`. That would be probably a better design. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298250#comment-15298250 ] ASF GitHub Bot commented on FLINK-3761: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-221289448 I started looking into it, but man this is one big change... I have some first remarks about API and internals: Whats the reason for the introduction of `PartitionedState`? The Javadoc for `State` already says that it is the base class for partitioned state and that it is only usable on a `KeyedStream`. The signature of `KeyGroupedStateBackend` and `PartitionedStateBackend` is exactly the same. `AbstractStateBackend` has both, method `createPartitionedStateBackend` and `createKeyGroupStateBackend`. Users of an `AbstractStateBackend` should only ever call the latter while the former is reserved for internal use by the default implementation for `KeyGroupedStateBackend` which is `GenericKeyGroupStateBackend`. Also, `AbstractStreamOperator` has the new method `getKeyGroupStateBackend` that should be used by operators such as the `WindowOperator` to deal with partitioned state. Now, where am I going with this? What I think is that the `AbstractStateBackend` should only have a method `createPartitionedStateBackend` that is externally visible. This would be used by the `AbstractStreamOperator` to create a state backend and users of the interface, i.e. `WindowOperator` would also deal just with `PartitionedStateBackend`, which they get from `AbstractStreamOperator.getPartitionedStateBackend`. The fact that there are these key groups should not be visible to users of a state backend. Internally, state backends would use the `GenericKeyGroupStateBackend`, they could provide an interface to it for creating non-key-grouped backends. Above, "exactly the same" is not 100 % correct, since the snapshot/restore methods differ slightly but I think this could be worked around. Also, I found it quite hard to express what I actually mean but I hope you get my point. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290909#comment-15290909 ] ASF GitHub Bot commented on FLINK-3761: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-220293946 Yeah, we were also wondering wether it would make sense to allow the state itself to be repartitioned, i.e. union and then split into the new parallelism. In this way we wouldn't read all state in every operator. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290885#comment-15290885 ] ASF GitHub Bot commented on FLINK-3761: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-220289629 In order to support scaling for non-partitioned state, the next step could be the introduction of a kind of union state. The idea would be to aggregate the non-partitioned state of each subtask and then sending it to all subtasks upon recovery. Then every subtask can pick from the union state what it needs for its execution > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288838#comment-15288838 ] ASF GitHub Bot commented on FLINK-3761: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-220002453 @gyfora Yes, this is correct. We'll have to put the timers into partitioned state and upon restarting iterate over the saved timers to re-set any callbacks at the `StreamTask`. Doing this naively would slow things down. What I had in mind is to still keep the timers in an in-memory `Map` but before snapshotting put them into a partitioned state. This way, we don't have the overhead every time that we deal with timers but just when checkpointing. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288820#comment-15288820 ] ASF GitHub Bot commented on FLINK-3761: --- Github user gyfora commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-21205 So if I understand currently there is no way to scale jobs with non-partitioned states. This also means that window operations (that register timers) will not be scalable right? > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)