Repository: flink Updated Branches: refs/heads/master 24f7fa9ef -> 2da82f915
[FLINK-2106] [runtime] Rename DriverStrategy.MERGE to DriverStrategy.INNER_MERGE This closes #1052 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2da82f91 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2da82f91 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2da82f91 Branch: refs/heads/master Commit: 2da82f9154f9563e3fe8454e153d372b4bf18996 Parents: f3dee23 Author: r-pogalz <r.pog...@campus.tu-berlin.de> Authored: Tue Aug 11 21:40:21 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Sep 8 16:57:51 2015 +0200 ---------------------------------------------------------------------- .../flink/optimizer/costs/CostEstimator.java | 2 +- .../operators/SortMergeJoinDescriptor.java | 4 ++-- .../plandump/PlanJSONDumpGenerator.java | 2 +- .../optimizer/java/JoinTranslationTest.java | 2 +- .../flink/runtime/operators/DriverStrategy.java | 2 +- .../flink/runtime/operators/JoinDriver.java | 4 ++-- .../operators/MatchTaskExternalITCase.java | 2 +- .../flink/runtime/operators/MatchTaskTest.java | 24 ++++++++++---------- .../examples/RelationalQueryCompilerTest.java | 4 ++-- .../ConnectedComponentsCoGroupTest.java | 2 +- 10 files changed, 24 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java index 3a02735..6a3ff09 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java @@ -195,7 +195,7 @@ public abstract class CostEstimator { // pipelined local union is for free break; - case MERGE: + case INNER_MERGE: addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight); break; case HYBRIDHASH_BUILD_FIRST: http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java index 356836a..3ab0aa7 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java @@ -49,7 +49,7 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor { @Override public DriverStrategy getStrategy() { - return DriverStrategy.MERGE; + return DriverStrategy.INNER_MERGE; } @Override @@ -99,7 +99,7 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor { inputOrders = tmp; } - return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders); + return new DualInputPlanNode(node, "Join(" + node.getOperator().getName() + ")", in1, in2, DriverStrategy.INNER_MERGE, this.keys1, this.keys2, inputOrders); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java index b04cdd8..dc99fd7 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java @@ -480,7 +480,7 @@ public class PlanJSONDumpGenerator { locString = "Nested Loops (Streamed Outer: " + child2name + ")"; break; - case MERGE: + case INNER_MERGE: locString = "Merge"; break; http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java index b3718b0..de1508b 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java @@ -103,7 +103,7 @@ public class JoinTranslationTest extends CompilerTestBase { DualInputPlanNode node = createPlanAndGetJoinNode(JoinHint.REPARTITION_SORT_MERGE); assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput1().getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_HASH, node.getInput2().getShipStrategy()); - assertEquals(DriverStrategy.MERGE, node.getDriverStrategy()); + assertEquals(DriverStrategy.INNER_MERGE, node.getDriverStrategy()); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java index f42a275..74f737e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java @@ -72,7 +72,7 @@ public enum DriverStrategy { ALL_GROUP_COMBINE(AllGroupCombineDriver.class, null, PIPELINED, 0), // both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key - MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2), + INNER_MERGE(JoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2), LEFT_OUTER_MERGE(LeftOuterJoinDriver.class, null, MATERIALIZING, MATERIALIZING, 2), http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index 5df715f..95e98ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -125,7 +125,7 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1 // create and return joining iterator according to provided local strategy. if (objectReuseEnabled) { switch (ls) { - case MERGE: + case INNER_MERGE: this.joinIterator = new ReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, @@ -157,7 +157,7 @@ public class JoinDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1 } } else { switch (ls) { - case MERGE: + case INNER_MERGE: this.joinIterator = new NonReusingMergeInnerJoinIterator<>(in1, in2, serializer1, comparator1, serializer2, comparator2, http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java index 30c1610..6f7fb21 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java @@ -76,7 +76,7 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java index 15f3d0c..6c4659d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java @@ -89,7 +89,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -123,7 +123,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -159,7 +159,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -195,7 +195,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -231,7 +231,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -267,7 +267,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -303,7 +303,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -338,7 +338,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -374,7 +374,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -404,7 +404,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -464,7 +464,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); @@ -524,7 +524,7 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor addDriverComparator(this.comparator1); addDriverComparator(this.comparator2); getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.MERGE); + getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); getTaskConfig().setRelativeMemoryDriver(bnljn_frac); setNumFileHandlesForSort(4); http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java index f58486b..f4efb8a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java @@ -303,7 +303,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { } private boolean checkBroadcastMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) { - if (DriverStrategy.MERGE == join.getDriverStrategy()) { + if (DriverStrategy.INNER_MERGE == join.getDriverStrategy()) { // driver keys Assert.assertEquals(set0, join.getKeysForInput1()); Assert.assertEquals(set0, join.getKeysForInput2()); @@ -327,7 +327,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { } private boolean checkRepartitionMergeJoin(DualInputPlanNode join, SingleInputPlanNode reducer) { - if (DriverStrategy.MERGE == join.getDriverStrategy()) { + if (DriverStrategy.INNER_MERGE == join.getDriverStrategy()) { // driver keys Assert.assertEquals(set0, join.getKeysForInput1()); Assert.assertEquals(set0, join.getKeysForInput2()); http://git-wip-us.apache.org/repos/asf/flink/blob/2da82f91/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java index de5fde0..99402a5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java @@ -93,7 +93,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase { Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy()); Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy()); - Assert.assertEquals(DriverStrategy.MERGE, neighborsJoin.getDriverStrategy()); + Assert.assertEquals(DriverStrategy.INNER_MERGE, neighborsJoin.getDriverStrategy()); Assert.assertEquals(set0, neighborsJoin.getKeysForInput1()); Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());