[ 
https://issues.apache.org/jira/browse/FLINK-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14113706#comment-14113706
 ] 

Fabian Hueske commented on FLINK-1060:
--------------------------------------

I started with this issue and have first version in this branch:
https://github.com/fhueske/incubator-flink/tree/userPartition

I decided to go with three partition methods for now:
- {{DataSet.partitionHashBy(int...)}} and 
{{DataSet.partitionHashBy(KeySelector)}} to perform an explicit hash 
partitioning
- {{DataSet.rebalance()}} to evenly shuffle data in a round-robin fashion

The feature is realized on the Java API level using a new PartitionOperator 
which translates eventually into a NoOpDriver with appropriate ShipStrategies.

Open issues are:
- DOP of Partitioner should be same as subsequent task. This needs to be fixed 
while translating the data flow. If the PartitionOperator has more than one 
subsequent task with different DOPs things become more tricky because the data 
needs to be shipped twice in that case.
- The feature is currently not covered by tests.


> Support explicit shuffling of DataSets
> --------------------------------------
>
>                 Key: FLINK-1060
>                 URL: https://issues.apache.org/jira/browse/FLINK-1060
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API, Optimizer
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> Right now, Flink only shuffles data if it is required by some operation such 
> as Reduce, Join, or CoGroup. There is no way to explicitly shuffle a data set.
> However, in some situations explicit shuffling would be very helpful 
> including:
> - rebalancing before compute-intensive Map operations
> - balancing, random or hash partitioning before PartitionMap operations (see 
> FLINK-1053)
> - better integration of support for HadoopJobs (see FLINK-838)
> With this issue, I propose to add the following methods to {{DataSet}}
> - {{DataSet.partitionHashBy(int...)}} and 
> {{DataSet.partitionHashBy(KeySelector)}} to perform an explicit hash 
> partitioning
> - {{DataSet.partitionRandomly()}} to shuffle data completely random
> - {{DataSet.partitionRoundRobin()}} to shuffle data in a round-robin fashion 
> that generates very even distribution with possible bias due to prior 
> distributions
> The {{DataSet.partitionRoundRobin()}} might not be necessary if we think that 
> random shuffling balances good enough.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to