[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15123471#comment-15123471
 ] 

ASF GitHub Bot commented on FLINK-3179:
---------------------------------------

Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1553#issuecomment-176754489
  
    You identified the right classes and methods for the fix, but the place 
within the method is not correct. Let me explain the issue.
    
    In the common case as for example in a WordCount program, the operator 
order looks like this:
    ```
    [Map] --hash-partition--> [Reduce]
    ```
    in this case, a combiner will be append to the Map to reduce the data 
before it is partitioned over the network. This looks like:
    ```
    [Map] --local-fwd--> [Combine] --hash-partition--> [Reduce]
    ```
    In some cases, Flink knows that the data is already appropriately 
partitioned (e.g. after a join):
    ```
    [Join] --local-fwd--> [Reduce]
    ```
    in this case, the data is already local and no combiner needs to injected. 
The check is based on the shipping strategy of the input channel (this is the 
`if` case in `instantiate()`).
    
    In case of an explicit partition operator, the operators look as follows:
    ```
    [Map] --partition--> [Partition] --local-fwd--> [Reduce]
    ```
    hence, the code enters the `if` case, because the input shipping strategy 
is `FORWARD`.
    Instead we would like to inject a combiner between Map and Partition as 
follows:
    ```
    [Map] --local-fwd--> [Combine] --partition--> [Partition] --local-fwd--> 
[Reduce]
    ```
    Hence, we should adapt the condition to inject a combiner if the input 
strategy of Reduce is `FORWARD` and the input operator is a `PartitionNode`.
    
    We should add appropriate tests for this feature. I suggest:
    - a unit test case in `GroupReduceCompilationTest`
    - a unit test case in `ReduceCompilationTest`
    - an end-to-end integration test in `javaApiOperators.GroupReduceITCase` 
    - an end-to-end integration test in `javaApiOperators.ReduceITCase` 


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-3179
>                 URL: https://issues.apache.org/jira/browse/FLINK-3179
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.1
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to