[ 
https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589378#comment-15589378
 ] 

Greg Hogan commented on FLINK-4855:
-----------------------------------

Hi and thanks for submitting a ticket for this idea! Has this been discussed on 
the dev mailing list (if yes, please add a link)? It looks like a major 
addition to the DataStream API and should be fully vetted before starting.

> Add partitionedKeyBy to DataStream
> ----------------------------------
>
>                 Key: FLINK-4855
>                 URL: https://issues.apache.org/jira/browse/FLINK-4855
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Xiaowei Jiang
>            Assignee: MaGuowei
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> After we do any interesting operations (e.g. reduce) on KeyedStream, the 
> result becomes DataStream. In a lot of cases, the output still has the same 
> or compatible keys with the KeyedStream (logically). But to do further 
> operations on these keys, we are forced to use keyby again. This works 
> semantically, but is costly in two aspects. First, it destroys the 
> possibility of chaining, which is one of the most important optimization 
> technique. Second, keyby will greatly expand the connected components of 
> tasks, which has implications in failover optimization.
> To address this shortcoming, we propose a new operator partitionedKeyBy.
> DataStream {
>     public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
> }
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to 
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid 
> as an extra field. This guarantees that records from different tasks will 
> never produce the same keys.
> With this, it's possible to do
> ds.keyBy(key1).reduce(func1)
>     .partitionedKeyBy(key1).reduce(func2)
>     .partitionedKeyBy(key2).reduce(func3);
> Most importantly, in certain cases, we will be able to chains these into a 
> single vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to