[ https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150377#comment-15150377 ]
ASF GitHub Bot commented on FLINK-3179: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1553#discussion_r53153223 --- Diff: flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java --- @@ -59,37 +65,69 @@ public DriverStrategy getStrategy() { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD || - (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) + (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { + // adjust a sort (changes grouping, so it must be for this driver to combining sort + if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + // create an input node for combine with same parallelism as input node + ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); + combinerNode.setParallelism(in.getSource().getParallelism()); + if(toCombiner.getSource().getInputs().iterator().hasNext()) { + Channel source = toCombiner.getSource().getInputs().iterator().next(); + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, + "Combine ("+node.getOperator().getName()+")", source, + DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList); + addCombinerProperties(toCombiner, combiner); + Channel combinerChannel = new Channel(combiner); + combinerChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + // Create the partition single input plan node from the existing partition node + PlanNode partitionplanNode = in.getSource().getPlanNode(); + SingleInputPlanNode partition = new SingleInputPlanNode(in.getSource().getOptimizerNode(), partitionplanNode.getNodeName(), + combinerChannel, partitionplanNode.getDriverStrategy()); + partition.setCosts(partitionplanNode.getNodeCosts()); + partition.initProperties(partitionplanNode.getGlobalProperties(), partitionplanNode.getLocalProperties()); + // Create a reducer such that the input of the reducer is the partition node + Channel toReducer = new Channel(partition); + toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), + in.getShipStrategySortOrder(), in.getDataExchangeMode()); + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, + DriverStrategy.SORTED_REDUCE, this.keyList); + } + } return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_REDUCE, this.keyList); + DriverStrategy.SORTED_REDUCE, this.keyList); } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner Channel toCombiner = new Channel(in.getSource()); toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - + // create an input node for combine with same parallelism as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); combinerNode.setParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getOperator().getName()+")", toCombiner, --- End diff -- No reformatting please. > Combiner is not injected if Reduce or GroupReduce input is explicitly > partitioned > --------------------------------------------------------------------------------- > > Key: FLINK-3179 > URL: https://issues.apache.org/jira/browse/FLINK-3179 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 0.10.1 > Reporter: Fabian Hueske > Assignee: ramkrishna.s.vasudevan > Priority: Critical > Fix For: 1.0.0, 0.10.2 > > > The optimizer does not inject a combiner if the input of a Reducer or > GroupReducer is explicitly partitioned as in the following example > {code} > DataSet<Tuple2<String,Integer>> words = ... > DataSet<Tuple2<String,Integer>> counts = words > .partitionByHash(0) > .groupBy(0) > .sum(1); > {code} > Explicit partitioning can be useful to enforce partitioning on a subset of > keys or to use a different partitioning method (custom or range partitioning). > This issue should be fixed by changing the {{instantiate()}} methods of the > {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such > that a combine is injected in front of a {{PartitionPlanNode}} if it is the > input of a Reduce or GroupReduce operator. This should only happen, if the > Reducer is the only successor of the Partition operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)