Github user fhueske commented on the pull request:
https://github.com/apache/flink/pull/1553#issuecomment-176754489
You identified the right classes and methods for the fix, but the place
within the method is not correct. Let me explain the issue.
In the common case as for example in a WordCount program, the operator
order looks like this:
```
[Map] --hash-partition--> [Reduce]
```
in this case, a combiner will be append to the Map to reduce the data
before it is partitioned over the network. This looks like:
```
[Map] --local-fwd--> [Combine] --hash-partition--> [Reduce]
```
In some cases, Flink knows that the data is already appropriately
partitioned (e.g. after a join):
```
[Join] --local-fwd--> [Reduce]
```
in this case, the data is already local and no combiner needs to injected.
The check is based on the shipping strategy of the input channel (this is the
`if` case in `instantiate()`).
In case of an explicit partition operator, the operators look as follows:
```
[Map] --partition--> [Partition] --local-fwd--> [Reduce]
```
hence, the code enters the `if` case, because the input shipping strategy
is `FORWARD`.
Instead we would like to inject a combiner between Map and Partition as
follows:
```
[Map] --local-fwd--> [Combine] --partition--> [Partition] --local-fwd-->
[Reduce]
```
Hence, we should adapt the condition to inject a combiner if the input
strategy of Reduce is `FORWARD` and the input operator is a `PartitionNode`.
We should add appropriate tests for this feature. I suggest:
- a unit test case in `GroupReduceCompilationTest`
- a unit test case in `ReduceCompilationTest`
- an end-to-end integration test in `javaApiOperators.GroupReduceITCase`
- an end-to-end integration test in `javaApiOperators.ReduceITCase`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---