[
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126305#comment-15126305
]
ASF GitHub Bot commented on FLINK-3179:
---------------------------------------
Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1553#issuecomment-177997324
The `GroupReduceWithCombineProperties.instanciate()` method checks the
shipping strategy of the input channel. In case of the `WordCount` example
*without* explicit hash combiner, the shipping strategy is `PARTITION_HASH` and
the `else` branch will inject a combiner. If you add an explicit partition
operator, the input shipping strategy of the Reduce operator is `FORWARD` and
the `if` branch is executed and does not add a combiner.
Hence the logic has to into the `if` branch and not into the `else` branch.
Or even better add an additional condition to the `if` case
`!(in.getSource().getOptimizerNode() instanceof PartitionNode)` and add an `if
else` branch to handle the special case of the explicit partition operator.
> Combiner is not injected if Reduce or GroupReduce input is explicitly
> partitioned
> ---------------------------------------------------------------------------------
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 0.10.1
> Reporter: Fabian Hueske
> Assignee: ramkrishna.s.vasudevan
> Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
> .partitionByHash(0)
> .groupBy(0)
> .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the
> input of a Reduce or GroupReduce operator. This should only happen, if the
> Reducer is the only successor of the Partition operator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)