[ 
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150383#comment-15150383
 ] 

ASF GitHub Bot commented on FLINK-3179:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1553#discussion_r53153472
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 ---
    @@ -434,6 +431,95 @@ public void 
testCorrectnessOfGroupreduceWithDescendingGroupSort() throws Excepti
        }
     
        @Test
    +   public void 
testCorrectnessOfGroupreduceWithExplicitPartitionOfReducer() throws Exception {
    +           /*
    +            * check correctness of groupReduce with descending group sort
    +            */
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           env.setParallelism(1);
    +
    +           DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
    +
    +           DataSet<Tuple2<String, Integer>> counts =
    +                   // split up the lines in pairs (2-tuples) containing: 
(word,1)
    +                   ds.flatMap(new Tokenizer()).partitionByHash(0)
    +                           // group by the tuple field "0" and sum up 
tuple field "1"
    +                           .groupBy(0)
    +                           .sum(1);
    +
    +           List<Tuple2<String, Integer>> result = counts.collect();
    +           String expected = "am,1\n"
    +                   +
    +                   "are,1\n" +
    +                   "comment,1\n" +
    +                   "fine,1\n" +
    +                   "hello,3\n" +
    +                   "hi,1\n"+
    +                   "how,1\n"+
    +                   "i,1\n"+
    +                   "lol,1\n"+
    +                   "luke,1\n"+
    +                   "random,1\n"+
    +                   "skywalker,1\n"+
    +                   "world,2\n"+
    +                   "you,1\n";
    +
    +           compareResultAsTuples(result, expected);
    +   }
    +
    +   @Test
    +   public void 
testCorrectnessOfGroupreduceWithoutExplicitPartitionOfReducer() throws 
Exception {
    +           /*
    +            * check correctness of groupReduce with descending group sort
    +            */
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           env.setParallelism(1);
    --- End diff --
    
    No parallelism of `1` and a custom `GroupReduceFunction` with 
`GroupCombineFunction` interface.


> Combiner is not injected if Reduce or GroupReduce input is explicitly 
> partitioned
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-3179
>                 URL: https://issues.apache.org/jira/browse/FLINK-3179
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 0.10.1
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or 
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
>   .partitionByHash(0)
>   .groupBy(0)
>   .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of 
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the 
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such 
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the 
> input of a Reduce or GroupReduce operator. This should only happen, if the 
> Reducer is the only successor of the Partition operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to