Repository: flink Updated Branches: refs/heads/master 9beccec45 -> ce10e57bc
[FLINK-7204] [core] CombineHint.NONE Add a new option to CombineHint which excludes the creation of a combiner for a reduce function. Gelly now excludes the combiner when simplifying graphs as used in most algorithm unit and integration tests. This closes #4350 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce10e57b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce10e57b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce10e57b Branch: refs/heads/master Commit: ce10e57bc163babd59005fa250c26e7604f23cf5 Parents: 9beccec Author: Greg Hogan <c...@greghogan.com> Authored: Sun Jul 16 07:24:59 2017 -0400 Committer: Greg Hogan <c...@greghogan.com> Committed: Wed Jul 19 14:44:49 2017 -0400 ---------------------------------------------------------------------- .../operators/base/ReduceOperatorBase.java | 8 ++- .../graph/asm/simple/directed/Simplify.java | 2 + .../graph/asm/simple/undirected/Simplify.java | 2 + .../apache/flink/optimizer/dag/ReduceNode.java | 10 ++-- .../GroupReduceWithCombineProperties.java | 2 +- .../optimizer/operators/ReduceProperties.java | 25 ++++---- .../optimizer/java/ReduceCompilationTest.java | 62 ++++++++++++++++++-- 7 files changed, 86 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java index 7828748..f97e4d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java @@ -76,7 +76,13 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI * Use a hash-based strategy. This should be faster in most cases, especially if the number * of different keys is small compared to the number of input elements (eg. 1/10). */ - HASH + HASH, + + /** + * Disable the use of a combiner. This can be faster in cases when the number of different keys + * is very small compared to the number of input elements (eg. 1/100). + */ + NONE } private CombineHint hint; http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 1bab9c6..511840a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.asm.simple.directed; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -44,6 +45,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { .setParallelism(parallelism) .name("Remove self-loops") .distinct(0, 1) + .setCombineHint(CombineHint.NONE) .setParallelism(parallelism) .name("Remove duplicate edges"); http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index 6f1e282..21db233 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.asm.simple.undirected; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -74,6 +75,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> { .setParallelism(parallelism) .name("Remove self-loops") .distinct(0, 1) + .setCombineHint(CombineHint.NONE) .setParallelism(parallelism) .name("Remove duplicate edges"); http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java index e83352e..1a1f3eb 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java @@ -16,12 +16,8 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dag; -import java.util.Collections; -import java.util.List; - import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.operators.AllReduceProperties; @@ -29,6 +25,9 @@ import org.apache.flink.optimizer.operators.OperatorDescriptorSingle; import org.apache.flink.optimizer.operators.ReduceProperties; import org.apache.flink.runtime.operators.DriverStrategy; +import java.util.Collections; +import java.util.List; + /** * The Optimizer representation of a <i>Reduce</i> operator. */ @@ -63,6 +62,9 @@ public class ReduceNode extends SingleInputNode { case HASH: combinerStrategy = DriverStrategy.HASHED_PARTIAL_REDUCE; break; + case NONE: + combinerStrategy = DriverStrategy.NONE; + break; default: throw new RuntimeException("Unknown CombineHint"); } http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java index 888b670..accd11b 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -105,7 +105,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); } - return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in, + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java index d8e5a6c..eab31d3 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java @@ -65,20 +65,18 @@ public final class ReduceProperties extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + Channel toReducer = in; + if (in.getShipStrategy() == ShipStrategyType.FORWARD || - (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) - { - if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { + if (in.getSource().getOptimizerNode() instanceof PartitionNode) { LOG.warn("Cannot automatically inject combiner for ReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator."); } - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_REDUCE, this.keyList); - } - else { + } else if (combinerStrategy != DriverStrategy.NONE) { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner Channel toCombiner = new Channel(in.getSource()); toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - + // create an input node for combine with same parallelism as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); combinerNode.setParallelism(in.getSource().getParallelism()); @@ -89,15 +87,16 @@ public final class ReduceProperties extends OperatorDescriptorSingle { combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); - - Channel toReducer = new Channel(combiner); + + toReducer = new Channel(combiner); toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder(), in.getDataExchangeMode()); toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - - return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer, - DriverStrategy.SORTED_REDUCE, this.keyList); } + + return new SingleInputPlanNode(node, "Reduce (" + node.getOperator().getName() + ")", toReducer, + DriverStrategy.SORTED_REDUCE, this.keyList); + } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java index f513155..d2c640f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -19,23 +19,25 @@ package org.apache.flink.optimizer.java; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.util.CompilerTestBase; -import org.junit.Test; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.DriverStrategy; -import static org.junit.Assert.*; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @SuppressWarnings("serial") public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable { @@ -327,4 +329,52 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); } } + + /** + * Test program compilation when the Reduce's combiner has been excluded + * by setting {@code CombineHint.NONE}. + */ + @Test + public void testGroupedReduceWithoutCombiner() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(8); + + DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .groupBy(0) + .reduce(new RichReduceFunction<Tuple2<String, Double>>() { + @Override + public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) { + return null; + } + }).setCombineHint(CombineHint.NONE).name("reducer") + .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // check wiring + assertEquals(sourceNode, reduceNode.getInput().getSource()); + + // check the strategies + assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0), reduceNode.getKeys(0)); + assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); + + // check parallelism + assertEquals(6, sourceNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } }