[ https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-4855: ---------------------------------- Labels: auto-unassigned stale-major (was: auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Major but is unassigned and neither itself nor its Sub-Tasks have been updated for 30 days. I have gone ahead and added a "stale-major" to the issue". If this ticket is a Major, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add partitionedKeyBy to DataStream > ---------------------------------- > > Key: FLINK-4855 > URL: https://issues.apache.org/jira/browse/FLINK-4855 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Reporter: Xiaowei Jiang > Priority: Major > Labels: auto-unassigned, stale-major > Original Estimate: 168h > Remaining Estimate: 168h > > After we do any interesting operations (e.g. reduce) on KeyedStream, the > result becomes DataStream. In a lot of cases, the output still has the same > or compatible keys with the KeyedStream (logically). But to do further > operations on these keys, we are forced to use keyby again. This works > semantically, but is costly in two aspects. First, it destroys the > possibility of chaining, which is one of the most important optimization > technique. Second, keyby will greatly expand the connected components of > tasks, which has implications in failover optimization. > To address this shortcoming, we propose a new operator partitionedKeyBy. > DataStream { > public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key) > } > Semantically, DataStream.partitionedKeyBy(key) is equivalent to > DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid > as an extra field. This guarantees that records from different tasks will > never produce the same keys. > With this, it's possible to do > ds.keyBy(key1).reduce(func1) > .partitionedKeyBy(key1).reduce(func2) > .partitionedKeyBy(key2).reduce(func3); > Most importantly, in certain cases, we will be able to chains these into a > single vertex. -- This message was sent by Atlassian Jira (v8.3.4#803005)