Repository: flink
Updated Branches:
  refs/heads/master 9beccec45 -> ce10e57bc


[FLINK-7204] [core] CombineHint.NONE

Add a new option to CombineHint which excludes the creation of a
combiner for a reduce function.

Gelly now excludes the combiner when simplifying graphs as used in most
algorithm unit and integration tests.

This closes #4350


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce10e57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce10e57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce10e57b

Branch: refs/heads/master
Commit: ce10e57bc163babd59005fa250c26e7604f23cf5
Parents: 9beccec
Author: Greg Hogan <c...@greghogan.com>
Authored: Sun Jul 16 07:24:59 2017 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Wed Jul 19 14:44:49 2017 -0400

----------------------------------------------------------------------
 .../operators/base/ReduceOperatorBase.java      |  8 ++-
 .../graph/asm/simple/directed/Simplify.java     |  2 +
 .../graph/asm/simple/undirected/Simplify.java   |  2 +
 .../apache/flink/optimizer/dag/ReduceNode.java  | 10 ++--
 .../GroupReduceWithCombineProperties.java       |  2 +-
 .../optimizer/operators/ReduceProperties.java   | 25 ++++----
 .../optimizer/java/ReduceCompilationTest.java   | 62 ++++++++++++++++++--
 7 files changed, 86 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 7828748..f97e4d6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -76,7 +76,13 @@ public class ReduceOperatorBase<T, FT extends 
ReduceFunction<T>> extends SingleI
                 * Use a hash-based strategy. This should be faster in most 
cases, especially if the number
                 * of different keys is small compared to the number of input 
elements (eg. 1/10).
                 */
-               HASH
+               HASH,
+
+               /**
+                * Disable the use of a combiner. This can be faster in cases 
when the number of different keys
+                * is very small compared to the number of input elements (eg. 
1/100).
+                */
+               NONE
        }
 
        private CombineHint hint;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 1bab9c6..511840a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.asm.simple.directed;
 
 import org.apache.flink.api.common.functions.FilterFunction;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -44,6 +45,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
                                .setParallelism(parallelism)
                                .name("Remove self-loops")
                        .distinct(0, 1)
+                               .setCombineHint(CombineHint.NONE)
                                .setParallelism(parallelism)
                                .name("Remove duplicate edges");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 6f1e282..21db233 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.asm.simple.undirected;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -74,6 +75,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
                                .setParallelism(parallelism)
                                .name("Remove self-loops")
                        .distinct(0, 1)
+                               .setCombineHint(CombineHint.NONE)
                                .setParallelism(parallelism)
                                .name("Remove duplicate edges");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index e83352e..1a1f3eb 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.AllReduceProperties;
@@ -29,6 +25,9 @@ import 
org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
 import org.apache.flink.optimizer.operators.ReduceProperties;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * The Optimizer representation of a <i>Reduce</i> operator.
  */
@@ -63,6 +62,9 @@ public class ReduceNode extends SingleInputNode {
                                case HASH:
                                        combinerStrategy = 
DriverStrategy.HASHED_PARTIAL_REDUCE;
                                        break;
+                               case NONE:
+                                       combinerStrategy = DriverStrategy.NONE;
+                                       break;
                                default:
                                        throw new RuntimeException("Unknown 
CombineHint");
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index 888b670..accd11b 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -105,7 +105,7 @@ public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSi
                                
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
                                                                        
in.getLocalStrategySortOrder());
                        }
-                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", in,
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
                                                                                
        DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
                } else {
                        // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index d8e5a6c..eab31d3 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -65,20 +65,18 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
+               Channel toReducer = in;
+
                if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
-                               (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
-               {
-                       if(in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
+                               (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty())) {
+                       if (in.getSource().getOptimizerNode() instanceof 
PartitionNode) {
                                LOG.warn("Cannot automatically inject combiner 
for ReduceFunction. Please add an explicit combiner with combineGroup() in 
front of the partition operator.");
                        }
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
-                                                                               
        DriverStrategy.SORTED_REDUCE, this.keyList);
-               }
-               else {
+               } else if (combinerStrategy != DriverStrategy.NONE) {
                        // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
                        Channel toCombiner = new Channel(in.getSource());
                        toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       
+
                        // create an input node for combine with same 
parallelism as input node
                        ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
                        
combinerNode.setParallelism(in.getSource().getParallelism());
@@ -89,15 +87,16 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
 
                        combiner.setCosts(new Costs(0, 0));
                        
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
-                       
-                       Channel toReducer = new Channel(combiner);
+
+                       toReducer = new Channel(combiner);
                        toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
                                                                                
in.getShipStrategySortOrder(), in.getDataExchangeMode());
                        toReducer.setLocalStrategy(LocalStrategy.SORT, 
in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-
-                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", toReducer,
-                                                                               
        DriverStrategy.SORTED_REDUCE, this.keyList);
                }
+
+               return new SingleInputPlanNode(node, "Reduce (" + 
node.getOperator().getName() + ")", toReducer,
+                       DriverStrategy.SORTED_REDUCE, this.keyList);
+
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index f513155..d2c640f 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -19,23 +19,25 @@
 package org.apache.flink.optimizer.java;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class ReduceCompilationTest extends CompilerTestBase implements 
java.io.Serializable {
@@ -327,4 +329,52 @@ public class ReduceCompilationTest extends 
CompilerTestBase implements java.io.S
                        fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
                }
        }
+
+       /**
+        * Test program compilation when the Reduce's combiner has been excluded
+        * by setting {@code CombineHint.NONE}.
+        */
+       @Test
+       public void testGroupedReduceWithoutCombiner() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(8);
+
+               DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                       .name("source").setParallelism(6);
+
+               data
+                       .groupBy(0)
+                       .reduce(new RichReduceFunction<Tuple2<String, 
Double>>() {
+                               @Override
+                               public Tuple2<String, Double> 
reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
+                                       return null;
+                               }
+                       }).setCombineHint(CombineHint.NONE).name("reducer")
+                       .output(new DiscardingOutputFormat<Tuple2<String, 
Double>>()).name("sink");
+
+               Plan p = env.createProgramPlan();
+               OptimizedPlan op = compileNoStats(p);
+
+               OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+
+               // get the original nodes
+               SourcePlanNode sourceNode = resolver.getNode("source");
+               SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+               SinkPlanNode sinkNode = resolver.getNode("sink");
+
+               // check wiring
+               assertEquals(sourceNode, reduceNode.getInput().getSource());
+
+               // check the strategies
+               assertEquals(DriverStrategy.SORTED_REDUCE, 
reduceNode.getDriverStrategy());
+
+               // check the keys
+               assertEquals(new FieldList(0), reduceNode.getKeys(0));
+               assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
+
+               // check parallelism
+               assertEquals(6, sourceNode.getParallelism());
+               assertEquals(8, reduceNode.getParallelism());
+               assertEquals(8, sinkNode.getParallelism());
+       }
 }

Reply via email to