[ https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150354#comment-15150354 ]
ASF GitHub Bot commented on FLINK-3179: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53152023 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java --- @@ -110,28 +135,49 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() - .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); // set sorting comparator key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); // set grouping comparator key info combiner.setDriverKeyInfo(this.keyList, 1); - + Channel toReducer = new Channel(combiner); toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), - in.getShipStrategySortOrder(), in.getDataExchangeMode()); + in.getShipStrategySortOrder(), in.getDataExchangeMode()); if (in.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) { toReducer.setDataDistribution(in.getDataDistribution()); } toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), - in.getLocalStrategySortOrder()); + in.getLocalStrategySortOrder()); return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", - toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } + private void addCombinerNodeData(Channel in, Channel toCombiner, SingleInputPlanNode combiner) { --- End diff -- please rename method to `setCombinerProperties()` > Combiner is not injected if Reduce or GroupReduce input is explicitly > partitioned > --------------------------------------------------------------------------------- > > Key: FLINK-3179 > URL: https://issues.apache.org/jira/browse/FLINK-3179 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 0.10.1 > Reporter: Fabian Hueske > Assignee: ramkrishna.s.vasudevan > Priority: Critical > Fix For: 1.0.0, 0.10.2 > > > The optimizer does not inject a combiner if the input of a Reducer or > GroupReducer is explicitly partitioned as in the following example > {code} > DataSet<Tuple2<String,Integer>> words = ... > DataSet<Tuple2<String,Integer>> counts = words > .partitionByHash(0) > .groupBy(0) > .sum(1); > {code} > Explicit partitioning can be useful to enforce partitioning on a subset of > keys or to use a different partitioning method (custom or range partitioning). > This issue should be fixed by changing the {{instantiate()}} methods of the > {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such > that a combine is injected in front of a {{PartitionPlanNode}} if it is the > input of a Reduce or GroupReduce operator. This should only happen, if the > Reducer is the only successor of the Partition operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)