[
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150342#comment-15150342
]
ASF GitHub Bot commented on FLINK-3179:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1553#discussion_r53151628
--- Diff:
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
---
@@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in,
SingleInputNode node) {
combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
- .getName()+")", toCombiner,
DriverStrategy.SORTED_GROUP_COMBINE);
+ .getName()+")", toCombiner,
DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(),
toCombiner.getLocalProperties());
// set sorting comparator key info
combiner.setDriverKeyInfo(in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder(), 0);
// set grouping comparator key info
combiner.setDriverKeyInfo(this.keyList, 1);
-
+
Channel toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(),
in.getShipStrategyKeys(),
-
in.getShipStrategySortOrder(), in.getDataExchangeMode());
+ in.getShipStrategySortOrder(),
in.getDataExchangeMode());
if (in.getShipStrategy() ==
ShipStrategyType.PARTITION_RANGE) {
toReducer.setDataDistribution(in.getDataDistribution());
}
toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT,
in.getLocalStrategyKeys(),
-
in.getLocalStrategySortOrder());
+ in.getLocalStrategySortOrder());
return new SingleInputPlanNode(node, "Reduce
("+node.getOperator().getName()+")",
-
toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+ toReducer, DriverStrategy.SORTED_GROUP_REDUCE,
this.keyList);
}
}
+ private void addCombinerNodeData(Channel in, Channel toCombiner,
SingleInputPlanNode combiner) {
+ combiner.setCosts(new Costs(0, 0));
+ combiner.initProperties(toCombiner.getGlobalProperties(),
toCombiner.getLocalProperties());
+ // set sorting comparator key info
+ combiner.setDriverKeyInfo(in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder(), 0);
+ // set grouping comparator key info
+ combiner.setDriverKeyInfo(this.keyList, 1);
+ }
+
+ private SingleInputPlanNode getReducerSingleInputPlanNode(Channel in,
SingleInputNode node) {
--- End diff --
I don't think we need to move this into a function.
> Combiner is not injected if Reduce or GroupReduce input is explicitly
> partitioned
> ---------------------------------------------------------------------------------
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 0.10.1
> Reporter: Fabian Hueske
> Assignee: ramkrishna.s.vasudevan
> Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
> .partitionByHash(0)
> .groupBy(0)
> .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the
> input of a Reduce or GroupReduce operator. This should only happen, if the
> Reducer is the only successor of the Partition operator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)