[
https://issues.apache.org/jira/browse/FLINK-3179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150377#comment-15150377
]
ASF GitHub Bot commented on FLINK-3179:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1553#discussion_r53153223
--- Diff:
flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
---
@@ -59,37 +65,69 @@ public DriverStrategy getStrategy() {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode
node) {
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
- (node.getBroadcastConnections() != null &&
!node.getBroadcastConnections().isEmpty()))
+ (node.getBroadcastConnections() != null &&
!node.getBroadcastConnections().isEmpty()))
{
+ // adjust a sort (changes grouping, so it must be for
this driver to combining sort
+ if(in.getSource().getOptimizerNode() instanceof
PartitionNode) {
+ Channel toCombiner = new
Channel(in.getSource());
+
toCombiner.setShipStrategy(ShipStrategyType.FORWARD,
DataExchangeMode.PIPELINED);
+ // create an input node for combine with same
parallelism as input node
+ ReduceNode combinerNode = ((ReduceNode)
node).getCombinerUtilityNode();
+
combinerNode.setParallelism(in.getSource().getParallelism());
+
if(toCombiner.getSource().getInputs().iterator().hasNext()) {
+ Channel source =
toCombiner.getSource().getInputs().iterator().next();
+ SingleInputPlanNode combiner = new
SingleInputPlanNode(combinerNode,
+ "Combine
("+node.getOperator().getName()+")", source,
+
DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
+ addCombinerProperties(toCombiner,
combiner);
+ Channel combinerChannel = new
Channel(combiner);
+
combinerChannel.setShipStrategy(ShipStrategyType.FORWARD,
DataExchangeMode.PIPELINED);
+ // Create the partition single input
plan node from the existing partition node
+ PlanNode partitionplanNode =
in.getSource().getPlanNode();
+ SingleInputPlanNode partition = new
SingleInputPlanNode(in.getSource().getOptimizerNode(),
partitionplanNode.getNodeName(),
+ combinerChannel,
partitionplanNode.getDriverStrategy());
+
partition.setCosts(partitionplanNode.getNodeCosts());
+
partition.initProperties(partitionplanNode.getGlobalProperties(),
partitionplanNode.getLocalProperties());
+ // Create a reducer such that the input
of the reducer is the partition node
+ Channel toReducer = new
Channel(partition);
+
toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+ in.getShipStrategySortOrder(),
in.getDataExchangeMode());
+ return new SingleInputPlanNode(node,
"Reduce ("+node.getOperator().getName()+")", toReducer,
+ DriverStrategy.SORTED_REDUCE,
this.keyList);
+ }
+ }
return new SingleInputPlanNode(node, "Reduce
("+node.getOperator().getName()+")", in,
-
DriverStrategy.SORTED_REDUCE, this.keyList);
+ DriverStrategy.SORTED_REDUCE, this.keyList);
}
else {
// non forward case. all local properties are killed
anyways, so we can safely plug in a combiner
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD,
DataExchangeMode.PIPELINED);
-
+
// create an input node for combine with same
parallelism as input node
ReduceNode combinerNode = ((ReduceNode)
node).getCombinerUtilityNode();
combinerNode.setParallelism(in.getSource().getParallelism());
SingleInputPlanNode combiner = new
SingleInputPlanNode(combinerNode,
- "Combine
("+node.getOperator().getName()+")", toCombiner,
--- End diff --
No reformatting please.
> Combiner is not injected if Reduce or GroupReduce input is explicitly
> partitioned
> ---------------------------------------------------------------------------------
>
> Key: FLINK-3179
> URL: https://issues.apache.org/jira/browse/FLINK-3179
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 0.10.1
> Reporter: Fabian Hueske
> Assignee: ramkrishna.s.vasudevan
> Priority: Critical
> Fix For: 1.0.0, 0.10.2
>
>
> The optimizer does not inject a combiner if the input of a Reducer or
> GroupReducer is explicitly partitioned as in the following example
> {code}
> DataSet<Tuple2<String,Integer>> words = ...
> DataSet<Tuple2<String,Integer>> counts = words
> .partitionByHash(0)
> .groupBy(0)
> .sum(1);
> {code}
> Explicit partitioning can be useful to enforce partitioning on a subset of
> keys or to use a different partitioning method (custom or range partitioning).
> This issue should be fixed by changing the {{instantiate()}} methods of the
> {{ReduceProperties}} and {{GroupReduceWithCombineProperties}} classes such
> that a combine is injected in front of a {{PartitionPlanNode}} if it is the
> input of a Reduce or GroupReduce operator. This should only happen, if the
> Reducer is the only successor of the Partition operator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)