[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298371#comment-15298371
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-221312885
  
Yes definitely: better be safe than sorry. Will remove the 
`createPartitionedStateBackend` method from `AbstractStateBackend`.


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298360#comment-15298360
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-221311349
  
Jip, this last paragraph with the Factory is what I hinted at with my 
comment.  This may be somewhat academic but if there is a method 
`getPartitionedStateBackend` the likelihood of it being wrongly used is 
somewhat high.


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298355#comment-15298355
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-221309557
  
Thanks for the initial feedback @aljoscha :-)

The introduction of `PartitionedState` is indeed not strictly necessary for 
this PR. The idea was that we will have partitioned and non-partitioned state 
in the future. `PartitionedState` is the key-value state backed by the 
`PartitionedStateBackend` whereas non-partitioned state is backed by the 
`AbstractStateBackend`. The first non-partitioned state (apart from the state 
serialized via `CheckpointStateOutputStream`) could be the redistributable 
non-partitioned state necessary for the `KafkaSources`, for example. Thus, the 
`PartitionedState` is more of a logical separation and it lays the foundation 
so that also non-keyed stream operators can use a proper state abstraction. But 
I can revert it, if you deem it redundant or pre-mature.

It is true that the `PartitionedStateBackend` and the 
`KeyGroupStateBackend` have **almost** the same signature. However, the changes 
you've mentioned are imho crucial and made the whole refactoring of the state 
backends necessary in the first place. The difference is that the 
`KeyGroupStateBackend` is aware of the key groups and, consequently, is able to 
snapshot and restore each key group individually. Trying to work around this 
would mean that the `PartitionedStateBackend` always has a single key group 
associated. But for that, it would have to know the sub task index of the 
enclosing `StreamOperator` to assign a sensible key group index. Furthermore, 
it wouldn't make sense to use any other `PartitionedStateBackend` than the 
`KeyGroupStateBackend` (given that it respects the `KeyGroupAssigner`) for the 
`AbstractStreamOperator`, because the data is shuffled according to the key 
group assignments. In general, I think the notion of key groups are touching 
too many parts of the Flink runtime so that it makes no longer sense to try to 
unify the `KeyGroupStateBackends` and `PartitionedStateBackends`. The state 
backends used by the `AbstractStreamOperator` have to be aware of that notion.

You can regard the `PartitionedStateBackend` as an internal class which was 
introduced to reuse the existing state backend implementations via the 
`GenericKeyGroupStateBackend`. In the future it might make sense to directly 
implement the `KeyGroupStateBackend` interface to decrease the key group 
overhead. It's just unfortunate that Java does not allow to specify package 
private methods. Otherwise, I would have declared the 
`createPartitionedStateBackend` as package private. But since the 
`GenericKeyGroupStateBackend` resides in a sub-package of 
`o.a.f.runtime.state`, it cannot access this method. But I think we could 
refactor it the following way: Remove `createPartitionedStateBackend`, make 
`createKeyGroupStateBackend` abstract, let the implementations of 
`AbstractStateBackend` implement a `PartitionedStateBackendFactory` interface 
and define the `createKeyGroupStateBackend` method for all 
`AbstractStateBackend` implementations with creating a 
`GenericKeyGroupStateBackend` which requires a 
`PartitionedStateBackendFactory`. That would be probably a better design.


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298250#comment-15298250
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-221289448
  
I started looking into it, but man this is one big change...  

I have some first remarks about API and internals:

Whats the reason for the introduction of `PartitionedState`? The Javadoc 
for `State` already says that it is the base class for partitioned state and 
that it is only usable on a `KeyedStream`.

The signature of `KeyGroupedStateBackend` and `PartitionedStateBackend` is 
exactly the same. `AbstractStateBackend` has both, method 
`createPartitionedStateBackend` and `createKeyGroupStateBackend`. Users of an 
`AbstractStateBackend` should only ever call the latter while the former is 
reserved for internal use by the default implementation for 
`KeyGroupedStateBackend` which is `GenericKeyGroupStateBackend`. Also, 
`AbstractStreamOperator` has the new method `getKeyGroupStateBackend` that 
should be used by operators such as the `WindowOperator` to deal with 
partitioned state. Now, where am I going with this? What I think is that the 
`AbstractStateBackend` should only have a method 
`createPartitionedStateBackend` that is externally visible. This would be used 
by the `AbstractStreamOperator` to create a state backend and users of the 
interface, i.e. `WindowOperator` would also deal just with 
`PartitionedStateBackend`, which they get from 
`AbstractStreamOperator.getPartitionedStateBackend`. The fact that there are 
these key groups should not be visible to users of a state backend. Internally, 
state backends would use the `GenericKeyGroupStateBackend`, they could provide 
an interface to it for creating non-key-grouped backends.

Above, "exactly the same" is not 100 % correct, since the snapshot/restore 
methods differ slightly but I think this could be worked around. Also, I found 
it quite hard to express what I actually mean but I hope you get my point.  


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290909#comment-15290909
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-220293946
  
Yeah, we were also wondering wether it would make sense to allow the state 
itself to be repartitioned, i.e. union and then split into the new parallelism. 
In this way we wouldn't read all state in every operator.


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290885#comment-15290885
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-220289629
  
In order to support scaling for non-partitioned state, the next step could 
be the introduction of a kind of union state. The idea would be to aggregate 
the non-partitioned state of each subtask and then sending it to all subtasks 
upon recovery. Then every subtask can pick from the union state what it needs 
for its execution


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288838#comment-15288838
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-220002453
  
@gyfora Yes, this is correct. We'll have to put the timers into partitioned 
state and upon restarting iterate over the saved timers to re-set any callbacks 
at the `StreamTask`.

Doing this naively would slow things down. What I had in mind is to still 
keep the timers in an in-memory `Map` but before snapshotting put them into a 
partitioned state. This way, we don't have the overhead every time that we deal 
with timers but just when checkpointing.


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288820#comment-15288820
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user gyfora commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-21205
  
So if I understand currently there is no way to scale jobs with 
non-partitioned states. This also means that window operations (that register 
timers) will not be scalable right? 


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)