[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150391#comment-15150391
 ] 

ASF GitHub Bot commented on FLINK-3179:
---------------------------------------

Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1553#issuecomment-185170325
  
    Hi @ramkrish86, thanks for the update.
    In addition to my comments inline we also need to extend the `ReduceITCase`.
    
    Also we must take care of the case where the result of the partition 
operator goes into more than one function. Consider the following case:
    
    ```
                                         /--fwd--> [Reduce]
    [Input] --shuffle--> [Partitioner] -<
                                         \--fwd--> [Map]
    ```
    
    which should be translated to:
    
    ```
               /--fwd--> [Combine] --shuffle--> [Partitioner] --fwd--> [Reduce]
    [Input] --<
               \--shuffle--> [Partitioner] --fwd--> [Map]
    ```
    
    Both translation tests need to be extended to cover this case.
    
    Thanks, Fabian


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-3179
>                 URL: https://issues.apache.org/jira/browse/FLINK-3179
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.1
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to