Repository: flink
Updated Branches:
  refs/heads/master 24f7fa9ef -> 2da82f915


[FLINK-2106] [runtime] Rename DriverStrategy.MERGE to DriverStrategy.INNER_MERGE

This closes #1052


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2da82f91
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2da82f91
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2da82f91

Branch: refs/heads/master
Commit: 2da82f9154f9563e3fe8454e153d372b4bf18996
Parents: f3dee23
Author: r-pogalz <r.pog...@campus.tu-berlin.de>
Authored: Tue Aug 11 21:40:21 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Sep 8 16:57:51 2015 +0200

----------------------------------------------------------------------
 .../flink/optimizer/costs/CostEstimator.java    |  2 +-
 .../operators/SortMergeJoinDescriptor.java      |  4 ++--
 .../plandump/PlanJSONDumpGenerator.java         |  2 +-
 .../optimizer/java/JoinTranslationTest.java     |  2 +-
 .../flink/runtime/operators/DriverStrategy.java |  2 +-
 .../flink/runtime/operators/JoinDriver.java     |  4 ++--
 .../operators/MatchTaskExternalITCase.java      |  2 +-
 .../flink/runtime/operators/MatchTaskTest.java  | 24 ++++++++++----------
 .../examples/RelationalQueryCompilerTest.java   |  4 ++--
 .../ConnectedComponentsCoGroupTest.java         |  2 +-
 10 files changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 3a02735..6a3ff09 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -195,7 +195,7 @@ public abstract class CostEstimator {
                        // pipelined local union is for free
                        
                        break;
-               case MERGE:
+               case INNER_MERGE:
                        addLocalMergeCost(firstInput, secondInput, driverCosts, 
costWeight);
                        break;
                case HYBRIDHASH_BUILD_FIRST:

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
index 356836a..3ab0aa7 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
@@ -49,7 +49,7 @@ public class SortMergeJoinDescriptor extends 
AbstractJoinDescriptor {
 
        @Override
        public DriverStrategy getStrategy() {
-               return DriverStrategy.MERGE;
+               return DriverStrategy.INNER_MERGE;
        }
 
        @Override
@@ -99,7 +99,7 @@ public class SortMergeJoinDescriptor extends 
AbstractJoinDescriptor {
                        inputOrders = tmp;
                }
                
-               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, 
this.keys1, this.keys2, inputOrders);
+               return new DualInputPlanNode(node, "Join(" + 
node.getOperator().getName() + ")", in1, in2, DriverStrategy.INNER_MERGE, 
this.keys1, this.keys2, inputOrders);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index b04cdd8..dc99fd7 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -480,7 +480,7 @@ public class PlanJSONDumpGenerator {
                                locString = "Nested Loops (Streamed Outer: " + 
child2name + ")";
                                break;
 
-                       case MERGE:
+                       case INNER_MERGE:
                                locString = "Merge";
                                break;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
index b3718b0..de1508b 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -103,7 +103,7 @@ public class JoinTranslationTest extends CompilerTestBase {
                        DualInputPlanNode node = 
createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE);
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput1().getShipStrategy());
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
node.getInput2().getShipStrategy());
-                       assertEquals(DriverStrategy.MERGE, 
node.getDriverStrategy());
+                       assertEquals(DriverStrategy.INNER_MERGE, 
node.getDriverStrategy());
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index f42a275..74f737e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -72,7 +72,7 @@ public enum DriverStrategy {
        ALL_GROUP_COMBINE(AllGroupCombineDriver.class, null, PIPELINED, 0),
 
        // both inputs are merged, but materialized to the side for 
block-nested-loop-join among values with equal key
-       MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
+       INNER_MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
 
        LEFT_OUTER_MERGE(LeftOuterJoinDriver.class, null, MATERIALIZING, 
MATERIALIZING, 2),
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index 5df715f..95e98ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -125,7 +125,7 @@ public class JoinDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT1
                // create and return joining iterator according to provided 
local strategy.
                if (objectReuseEnabled) {
                        switch (ls) {
-                               case MERGE:
+                               case INNER_MERGE:
                                        this.joinIterator = new 
ReusingMergeInnerJoinIterator<>(in1, in2, 
                                                        serializer1, 
comparator1,
                                                        serializer2, 
comparator2,
@@ -157,7 +157,7 @@ public class JoinDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT1
                        }
                } else {
                        switch (ls) {
-                               case MERGE:
+                               case INNER_MERGE:
                                        this.joinIterator = new 
NonReusingMergeInnerJoinIterator<>(in1, in2,
                                                        serializer1, 
comparator1,
                                                        serializer2, 
comparator2,

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 30c1610..6f7fb21 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -76,7 +76,7 @@ public class MatchTaskExternalITCase extends 
DriverTestBase<FlatJoinFunction<Rec
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 15f3d0c..6c4659d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -89,7 +89,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -123,7 +123,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -159,7 +159,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -195,7 +195,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -231,7 +231,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -267,7 +267,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -303,7 +303,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -338,7 +338,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -374,7 +374,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                addDriverComparator(this.comparator1);
                addDriverComparator(this.comparator2);
                
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                setNumFileHandlesForSort(4);
                
@@ -404,7 +404,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                        addDriverComparator(this.comparator1);
                        addDriverComparator(this.comparator2);
                        
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-                       getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                        getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                        setNumFileHandlesForSort(4);
                        
@@ -464,7 +464,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                        addDriverComparator(this.comparator1);
                        addDriverComparator(this.comparator2);
                        
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-                       getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                        getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                        setNumFileHandlesForSort(4);
                        
@@ -524,7 +524,7 @@ public class MatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Recor
                        addDriverComparator(this.comparator1);
                        addDriverComparator(this.comparator2);
                        
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-                       getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
                        getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
                        setNumFileHandlesForSort(4);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index f58486b..f4efb8a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -303,7 +303,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
        }
        
        private boolean checkBroadcastMergeJoin(DualInputPlanNode join, 
SingleInputPlanNode reducer) {
-               if (DriverStrategy.MERGE == join.getDriverStrategy()) {
+               if (DriverStrategy.INNER_MERGE == join.getDriverStrategy()) {
                        // driver keys
                        Assert.assertEquals(set0, join.getKeysForInput1());
                        Assert.assertEquals(set0, join.getKeysForInput2());
@@ -327,7 +327,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
        }
        
        private boolean checkRepartitionMergeJoin(DualInputPlanNode join, 
SingleInputPlanNode reducer) {
-               if (DriverStrategy.MERGE == join.getDriverStrategy()) {
+               if (DriverStrategy.INNER_MERGE == join.getDriverStrategy()) {
                        // driver keys
                        Assert.assertEquals(set0, join.getKeysForInput1());
                        Assert.assertEquals(set0, join.getKeysForInput2());

http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
index de5fde0..99402a5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -93,7 +93,7 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
                Assert.assertEquals(DriverStrategy.NONE, 
vertexSource.getDriverStrategy());
                Assert.assertEquals(DriverStrategy.NONE, 
edgesSource.getDriverStrategy());
                
-               Assert.assertEquals(DriverStrategy.MERGE, 
neighborsJoin.getDriverStrategy());
+               Assert.assertEquals(DriverStrategy.INNER_MERGE, 
neighborsJoin.getDriverStrategy());
                Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
                Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
                

Reply via email to