[ 
https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14900521#comment-14900521
 ] 

ASF GitHub Bot commented on FLINK-2283:
---------------------------------------

GitHub user mbalassi opened a pull request:

    https://github.com/apache/flink/pull/1155

    [FLINK-2283] [streaming] Make grouped reduce/fold/aggregations stateful

    There is an open discussion at the related ticket [1] about fully removing 
the operators that I touch and partially remove here. I can accept both 
conclusions of the discussion, but even in the scenario when the operators get 
removed from the API afterwards the PR has certain merit to it:
    
    1. Cleans up the unused `StreamReduce` and `StreamFold` operators which 
should be removed either way.
    2. Adds an integration test for ensuring that not only user defined 
functions, but internal streaming operators can properly rely on the 
`OperatorState` interface. To do this it currently relies on the grouped 
reduce/fold aggregations, but this is just as important for windowing states, 
which are not properly checkpointed yet.
    3. Makes the grouped fold/reduce operators stateful, so that the previous 
test can be written.
    
    Some justification for the implementation choices:
    
    1. @gyfora has suggested to use the partitioned state at the ticket [1] 
instead of the manual map creation. In this scenario the grouped operators 
would not be unit testable any more as they would be dependent on the state 
partitioner information found in the keyed datastream. I decided against it.
    
    2. @StephanEwen has recently advised against adding unnecessary integration 
tests. [2] This is a feature that can only be tested as an integration test. I 
personally feel the need to cover internal operators with a checkpointing test 
despite the fact they currently use exactly the same mechanism as the UDFs as 
this implementation might be subject to slight changes.
    
    3. Elaborating on the previous point the `OperatorState` currently storing 
the internal state is also accessible for the user. This is an undesirable 
feature and might lead to accidental overwrite of the state. I am opening a 
Jira ticket for this. 
    
    [1] https://issues.apache.org/jira/browse/FLINK-2283
    [2] 
https://mail-archives.apache.org/mod_mbox/flink-dev/201509.mbox/%3CCANC1h_vvekciNVDzqCb8N4E5Kfzu4e1Mosnse1%3DV11HXnD2PBQ%40mail.gmail.com%3E

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbalassi/flink aggregator-states

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1155.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1155
    
----
commit 29ca808ccb8a1d705927eabb492e70df5e5af06c
Author: mbalassi <mbala...@apache.org>
Date:   2015-09-11T14:32:09Z

    [streaming] Removed unused StreamReduce
    
    Refactored corresponding tests, some minor cleanups.

commit 4bd1dd035780402919bb5257274e9258457dadf3
Author: mbalassi <mbala...@apache.org>
Date:   2015-09-13T06:19:07Z

    [FLINK-2283] [streaming] grouped reduce and fold operators checkpoint state

commit 3688a7c98500179f454e1641aedd7758b1fdc644
Author: mbalassi <mbala...@apache.org>
Date:   2015-09-20T20:27:11Z

    [FLINK-2283] [streaming] Test for checkpointing in internal operators

----


> Make grouped reduce/fold/aggregations stateful using Partitioned state
> ----------------------------------------------------------------------
>
>                 Key: FLINK-2283
>                 URL: https://issues.apache.org/jira/browse/FLINK-2283
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Gyula Fora
>            Assignee: Márton Balassi
>            Priority: Minor
>
> Currently the inner state of the grouped aggregations are not persisted as an 
> operator state. 
> These operators should be reimplemented to use the newly introduced 
> partitioned state abstractions which will make them fault tolerant and 
> scalable for the future.
> A suggested implementation would be to use a stateful mapper to implement the 
> desired behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to