http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java index 0dd7e93..ff0e004 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java @@ -47,7 +47,7 @@ import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; @@ -101,7 +101,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } catch (Exception e) { e.printStackTrace(); @@ -162,13 +162,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { allSinks.add(out3Path); for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath(); + String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); } // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } catch (Exception e) { e.printStackTrace(); @@ -262,7 +262,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); //Compile plan to verify that no error is thrown jobGen.compileJobGraph(oPlan); @@ -350,13 +350,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { allSinks.add(out3Path); for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath(); + String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); } // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } catch (Exception e) { e.printStackTrace(); @@ -449,7 +449,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); //Compile plan to verify that no error is thrown jobGen.compileJobGraph(oPlan); @@ -495,7 +495,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { .input2(ma2) .name("Match 2") .build(); - mat2.setParameter(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_MERGE); + mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE); FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2); @@ -505,7 +505,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); //Compile plan to verify that no error is thrown jobGen.compileJobGraph(oPlan); @@ -555,13 +555,13 @@ public class BranchingPlansCompilerTest extends CompilerTestBase { allSinks.add(out2Path); for (SinkPlanNode n : oPlan.getDataSinks()) { - String path = ((FileDataSink) n.getSinkNode().getPactContract()).getFilePath(); + String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath(); Assert.assertTrue("Invalid data sink.", allSinks.remove(path)); } // ---------- compile plan to nephele job graph to verify that no error is thrown ---------- - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } catch (Exception e) { e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java index c17ebe8..3e7da6c 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java @@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.dag.TempMode; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.DriverStrategy; @@ -51,7 +51,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { public void testRightSide() { try { - Plan plan = getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); + Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); OptimizedPlan oPlan = compileNoStats(plan); @@ -63,7 +63,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -79,7 +79,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { public void testRightSideCountercheck() { try { - Plan plan = getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST); + Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST); OptimizedPlan oPlan = compileNoStats(plan); @@ -91,7 +91,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -108,7 +108,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { public void testLeftSide() { try { - Plan plan = getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST); + Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST); OptimizedPlan oPlan = compileNoStats(plan); @@ -120,7 +120,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -136,7 +136,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { public void testLeftSideCountercheck() { try { - Plan plan = getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); + Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); OptimizedPlan oPlan = compileNoStats(plan); @@ -148,7 +148,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode()); assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -191,7 +191,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -212,10 +212,10 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10); Configuration joinStrategy = new Configuration(); - joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH); + joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); if(strategy != "") { - joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy); + joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy); } DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy); @@ -243,7 +243,7 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10); Configuration joinStrategy = new Configuration(); - joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy); + joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy); DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java index 5265e3a..565d992 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java @@ -59,9 +59,9 @@ public abstract class CompilerTestBase implements java.io.Serializable { protected transient DataStatistics dataStats; - protected transient PactCompiler withStatsCompiler; + protected transient Optimizer withStatsCompiler; - protected transient PactCompiler noStatsCompiler; + protected transient Optimizer noStatsCompiler; private transient int statCounter; @@ -70,10 +70,10 @@ public abstract class CompilerTestBase implements java.io.Serializable { @Before public void setup() { this.dataStats = new DataStatistics(); - this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator()); + this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator()); this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); - this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator()); + this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator()); this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); } @@ -111,7 +111,7 @@ public abstract class CompilerTestBase implements java.io.Serializable { HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>(); for (PlanNode n : p.getAllNodes()) { - Operator<?> c = n.getOriginalOptimizerNode().getPactContract(); + Operator<?> c = n.getOriginalOptimizerNode().getOperator(); String name = c.getName(); ArrayList<PlanNode> list = map.get(name); @@ -124,7 +124,7 @@ public abstract class CompilerTestBase implements java.io.Serializable { boolean shouldAdd = true; for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) { PlanNode in = iter.next(); - if (in.getOriginalOptimizerNode().getPactContract() == c) { + if (in.getOriginalOptimizerNode().getOperator() == c) { // is this the child or is our node the child if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) { SingleInputPlanNode thisNode = (SingleInputPlanNode) n; http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java index 7fa331a..b17e777 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java @@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.PlanNode; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.DummyInputFormat; import org.apache.flink.optimizer.util.DummyMatchStub; import org.apache.flink.optimizer.util.DummyOutputFormat; @@ -317,7 +317,7 @@ public class DOPChangeTest extends CompilerTestBase { OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); //Compile plan to verify that no error is thrown jobGen.compileJobGraph(oPlan); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java index b8809d7..aaee975 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.junit.Test; @SuppressWarnings("serial") @@ -41,7 +41,7 @@ public class DisjointDataFlowsTest extends CompilerTestBase { Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java index d97f855..34aa9f8 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java @@ -78,10 +78,10 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -136,13 +136,13 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, keyExtractor.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + assertEquals(6, combineNode.getParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, keyProjector.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -192,10 +192,10 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java index 146c085..6dadc19 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java @@ -25,7 +25,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.MapOperator; import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.DummyCrossStub; import org.apache.flink.optimizer.util.DummyInputFormat; import org.apache.flink.optimizer.util.DummyOutputFormat; @@ -74,7 +74,7 @@ public class HardPlansCompilationTest extends CompilerTestBase { plan.setDefaultParallelism(DEFAULT_PARALLELISM); OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java index dc6fcad..ac4f820 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java @@ -42,7 +42,7 @@ import org.apache.flink.optimizer.plan.Channel; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; @@ -79,7 +79,7 @@ public class IterationsCompilerTest extends CompilerTestBase { new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p); // check that the JobGraphGenerator accepts the plan - new NepheleJobGraphGenerator().compileJobGraph(p); + new JobGraphGenerator().compileJobGraph(p); } catch (Exception e) { e.printStackTrace(); @@ -116,7 +116,7 @@ public class IterationsCompilerTest extends CompilerTestBase { assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy()); assertTrue(wipn.getInput2().getTempMode().breaksPipeline()); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { System.err.println(e.getMessage()); @@ -152,7 +152,7 @@ public class IterationsCompilerTest extends CompilerTestBase { assertEquals(ShipStrategyType.PARTITION_HASH, wipn.getInput1().getShipStrategy()); assertTrue(wipn.getInput2().getTempMode().breaksPipeline()); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { System.err.println(e.getMessage()); @@ -188,7 +188,7 @@ public class IterationsCompilerTest extends CompilerTestBase { assertEquals(ShipStrategyType.FORWARD, wipn.getInput1().getShipStrategy()); assertTrue(wipn.getInput2().getTempMode().breaksPipeline()); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { System.err.println(e.getMessage()); @@ -222,7 +222,7 @@ public class IterationsCompilerTest extends CompilerTestBase { assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy()); } - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { System.err.println(e.getMessage()); @@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase { Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java index d6852f5..e65758f 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; import org.apache.flink.optimizer.testfunctions.IdentityMapper; @@ -133,7 +133,7 @@ public class NestedIterationsTest extends CompilerTestBase { OptimizedPlan op = compileNoStats(p); // job graph generator should be able to translate this - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); @@ -171,7 +171,7 @@ public class NestedIterationsTest extends CompilerTestBase { OptimizedPlan op = compileNoStats(p); // job graph generator should be able to translate this - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java index e5983d9..86f01b0 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java @@ -149,7 +149,7 @@ public class PipelineBreakerTest extends CompilerTestBase { DataSet<Long> initialSource = env.generateSequence(1, 10); Configuration conf= new Configuration(); - conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST); initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) @@ -171,7 +171,7 @@ public class PipelineBreakerTest extends CompilerTestBase { DataSet<Long> initialSource = env.generateSequence(1, 10); Configuration conf= new Configuration(); - conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND); initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) @@ -194,7 +194,7 @@ public class PipelineBreakerTest extends CompilerTestBase { DataSet<Long> initialSource = env.generateSequence(1, 10); Configuration conf= new Configuration(); - conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST); initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) @@ -217,7 +217,7 @@ public class PipelineBreakerTest extends CompilerTestBase { DataSet<Long> initialSource = env.generateSequence(1, 10); Configuration conf= new Configuration(); - conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND); + conf.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND); initialSource .map(new IdentityMapper<Long>()) .cross(initialSource).withParameters(conf) http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java index 23fa311..f6885c5 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java @@ -25,7 +25,7 @@ import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.DummyInputFormat; import org.apache.flink.optimizer.util.DummyOutputFormat; import org.apache.flink.optimizer.util.IdentityReduce; @@ -51,7 +51,7 @@ public class ReduceAllTest extends CompilerTestBase { try { OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } catch(CompilerException ce) { ce.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java index 8356e94..1fe16bb 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java @@ -63,7 +63,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase { oPlan.accept(new Visitor<PlanNode>() { @Override public boolean preVisit(PlanNode visitable) { - if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperatorBase) { + if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperatorBase) { for (Channel input: visitable.getInputs()) { GlobalProperties gprops = visitable.getGlobalProperties(); LocalProperties lprops = visitable.getLocalProperties(); @@ -78,7 +78,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase { lprops.getGroupedFields().contains(1)); } } - if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof MapOperatorBase) { + if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof MapOperatorBase) { for (Channel input: visitable.getInputs()) { GlobalProperties gprops = visitable.getGlobalProperties(); LocalProperties lprops = visitable.getLocalProperties(); @@ -124,7 +124,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase { oPlan.accept(new Visitor<PlanNode>() { @Override public boolean preVisit(PlanNode visitable) { - if (visitable instanceof DualInputPlanNode && visitable.getPactContract() instanceof JoinOperatorBase) { + if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof JoinOperatorBase) { DualInputPlanNode node = ((DualInputPlanNode) visitable); final Channel inConn1 = node.getInput1(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java index 6647483..92b4fc5 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java @@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.Channel; import org.apache.flink.optimizer.plan.NAryUnionPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.junit.Test; @SuppressWarnings("serial") @@ -80,7 +80,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase { assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0); assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); @@ -133,7 +133,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase { assertTrue(c.isOnDynamicPath()); } - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java index cb4bce4..5d15ed8 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java @@ -43,7 +43,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.PlanNode; import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.DummyInputFormat; import org.apache.flink.optimizer.util.DummyOutputFormat; import org.apache.flink.optimizer.util.IdentityReduce; @@ -78,7 +78,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); // Compile plan to verify that no error is thrown jobGen.compileJobGraph(oPlan); @@ -87,7 +87,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { @Override public boolean preVisit(PlanNode visitable) { - if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof ReduceOperator) { + if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof ReduceOperator) { for (Channel inConn : visitable.getInputs()) { Assert.assertTrue("Reduce should just forward the input if it is already partitioned", inConn.getShipStrategy() == ShipStrategyType.FORWARD); @@ -126,7 +126,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { // return the plan Plan plan = env.createProgramPlan("Test union on new java-api"); OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); // Compile plan to verify that no error is thrown jobGen.compileJobGraph(oPlan); @@ -139,7 +139,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { /* Test on the union output connections * It must be under the GroupOperator and the strategy should be forward */ - if (visitable instanceof SingleInputPlanNode && visitable.getPactContract() instanceof GroupReduceOperatorBase){ + if (visitable instanceof SingleInputPlanNode && visitable.getProgramOperator() instanceof GroupReduceOperatorBase){ final Channel inConn = ((SingleInputPlanNode) visitable).getInput(); Assert.assertTrue("Union should just forward the Partitioning", inConn.getShipStrategy() == ShipStrategyType.FORWARD ); @@ -156,7 +156,7 @@ public class UnionPropertyPropagationTest extends CompilerTestBase { final Channel inConn = inputs.next(); PlanNode inNode = inConn.getSource(); Assert.assertTrue("Input of Union should be FlatMapOperators", - inNode.getPactContract() instanceof FlatMapOperatorBase); + inNode.getProgramOperator() instanceof FlatMapOperatorBase); Assert.assertTrue("Shipment strategy under union should partition the data", inConn.getShipStrategy() == ShipStrategyType.PARTITION_HASH); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java index f327259..1e4124c 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.Plan; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.junit.Test; import static org.junit.Assert.fail; @@ -44,7 +44,7 @@ public class UnionReplacementTest extends CompilerTestBase { Plan plan = env.createProgramPlan(); OptimizedPlan oPlan = compileNoStats(plan); - NepheleJobGraphGenerator jobGen = new NepheleJobGraphGenerator(); + JobGraphGenerator jobGen = new JobGraphGenerator(); jobGen.compileJobGraph(oPlan); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java index d10803e..80c0bda 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java @@ -28,7 +28,7 @@ import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.junit.Test; @SuppressWarnings("serial") @@ -52,7 +52,7 @@ public class WorksetIterationCornerCasesTest extends CompilerTestBase { WorksetIterationPlanNode wipn = (WorksetIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource(); assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty()); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); jgg.compileJobGraph(op); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java index f17842e..6e7c0a3 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java @@ -34,7 +34,7 @@ import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.util.DummyInputFormat; import org.apache.flink.optimizer.util.DummyMatchStub; import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub; @@ -106,7 +106,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) || (ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) ); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } @Test @@ -150,7 +150,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } @Test @@ -193,7 +193,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase { assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size()); assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, boolean mapBeforeSolutionDelta) { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java index e876fbb..0273659 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java @@ -31,7 +31,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; @@ -75,7 +75,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase assertEquals(partitioner, partitioner1.getInput().getPartitioner()); assertEquals(partitioner, partitioner2.getInput().getPartitioner()); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); @@ -120,7 +120,7 @@ public class BinaryCustomPartitioningCompatibilityTest extends CompilerTestBase assertEquals(partitioner, partitioner1.getInput().getPartitioner()); assertEquals(partitioner, partitioner2.getInput().getPartitioner()); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java index f6d8d0e..d397ea2 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java @@ -64,17 +64,17 @@ public class CustomPartitioningTest extends CompilerTestBase { SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource(); assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(parallelism, sink.getDegreeOfParallelism()); + assertEquals(parallelism, sink.getParallelism()); assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(parallelism, mapper.getDegreeOfParallelism()); + assertEquals(parallelism, mapper.getParallelism()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); assertEquals(part, partitioner.getInput().getPartitioner()); - assertEquals(parallelism, partitioner.getDegreeOfParallelism()); + assertEquals(parallelism, partitioner.getParallelism()); assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); - assertEquals(parallelism, balancer.getDegreeOfParallelism()); + assertEquals(parallelism, balancer.getParallelism()); } catch (Exception e) { e.printStackTrace(); @@ -134,17 +134,17 @@ public class CustomPartitioningTest extends CompilerTestBase { SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource(); assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(parallelism, sink.getDegreeOfParallelism()); + assertEquals(parallelism, sink.getParallelism()); assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(parallelism, mapper.getDegreeOfParallelism()); + assertEquals(parallelism, mapper.getParallelism()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); assertEquals(part, partitioner.getInput().getPartitioner()); - assertEquals(parallelism, partitioner.getDegreeOfParallelism()); + assertEquals(parallelism, partitioner.getParallelism()); assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); - assertEquals(parallelism, balancer.getDegreeOfParallelism()); + assertEquals(parallelism, balancer.getParallelism()); } catch (Exception e) { e.printStackTrace(); @@ -206,23 +206,23 @@ public class CustomPartitioningTest extends CompilerTestBase { SingleInputPlanNode balancer = (SingleInputPlanNode) keyExtractor.getInput().getSource(); assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(parallelism, sink.getDegreeOfParallelism()); + assertEquals(parallelism, sink.getParallelism()); assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(parallelism, mapper.getDegreeOfParallelism()); + assertEquals(parallelism, mapper.getParallelism()); assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput().getShipStrategy()); - assertEquals(parallelism, keyRemover.getDegreeOfParallelism()); + assertEquals(parallelism, keyRemover.getParallelism()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy()); assertEquals(part, partitioner.getInput().getPartitioner()); - assertEquals(parallelism, partitioner.getDegreeOfParallelism()); + assertEquals(parallelism, partitioner.getParallelism()); assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput().getShipStrategy()); - assertEquals(parallelism, keyExtractor.getDegreeOfParallelism()); + assertEquals(parallelism, keyExtractor.getParallelism()); assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy()); - assertEquals(parallelism, balancer.getDegreeOfParallelism()); + assertEquals(parallelism, balancer.getParallelism()); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java index 2c5d235..cb4bd78 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java @@ -246,7 +246,7 @@ public class DataExchangeModeClosedBranchingTest extends CompilerTestBase { private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) { for (SinkPlanNode node : collection) { - String nodeName = node.getOptimizerNode().getPactContract().getName(); + String nodeName = node.getOptimizerNode().getOperator().getName(); if (nodeName != null && nodeName.equals(name)) { return node; } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java index 6c0e88b..6b2691a 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java @@ -171,7 +171,7 @@ public class DataExchangeModeOpenBranchingTest extends CompilerTestBase { private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) { for (SinkPlanNode node : collection) { - String nodeName = node.getOptimizerNode().getPactContract().getName(); + String nodeName = node.getOptimizerNode().getOperator().getName(); if (nodeName != null && nodeName.equals(name)) { return node; } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java index dae3c41..fe33635 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java @@ -25,7 +25,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.dag.OptimizerNode; import org.apache.flink.optimizer.dag.SingleInputNode; @@ -292,8 +292,8 @@ public class PipelineBreakingTest { } private static List<DataSinkNode> convertPlan(Plan p) { - PactCompiler.GraphCreatingVisitor dagCreator = - new PactCompiler.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode()); + Optimizer.GraphCreatingVisitor dagCreator = + new Optimizer.GraphCreatingVisitor(17, p.getExecutionConfig().getExecutionMode()); // create the DAG p.accept(dagCreator); @@ -312,8 +312,8 @@ public class PipelineBreakingTest { rootNode = new SinkJoiner(rootNode, iter.next()); } } - rootNode.accept(new PactCompiler.IdAndEstimatesVisitor(null)); - rootNode.accept(new PactCompiler.BranchesVisitor()); + rootNode.accept(new Optimizer.IdAndEstimatesVisitor(null)); + rootNode.accept(new Optimizer.BranchesVisitor()); return sinks; } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java index 32aeab9..a683968 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java @@ -73,9 +73,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy()); // check DOP - assertEquals(1, sourceNode.getDegreeOfParallelism()); - assertEquals(1, reduceNode.getDegreeOfParallelism()); - assertEquals(1, sinkNode.getDegreeOfParallelism()); + assertEquals(1, sourceNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -121,10 +121,10 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy()); // check DOP - assertEquals(8, sourceNode.getDegreeOfParallelism()); - assertEquals(8, combineNode.getDegreeOfParallelism()); - assertEquals(1, reduceNode.getDegreeOfParallelism()); - assertEquals(1, sinkNode.getDegreeOfParallelism()); + assertEquals(8, sourceNode.getParallelism()); + assertEquals(8, combineNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -172,9 +172,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -229,10 +229,10 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -285,12 +285,12 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, keyExtractor.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, keyProjector.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -351,13 +351,13 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, keyExtractor.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + assertEquals(6, combineNode.getParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, keyProjector.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java index 54b24dd..37a8e81 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java @@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.junit.Test; @@ -53,7 +53,7 @@ public class IterationCompilerTest extends CompilerTestBase { Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); @@ -81,7 +81,7 @@ public class IterationCompilerTest extends CompilerTestBase { Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); @@ -120,7 +120,7 @@ public class IterationCompilerTest extends CompilerTestBase { assertTrue(union.getCostWeight() >= 1); // see that the jobgraph generator can translate this - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); @@ -179,7 +179,7 @@ public class IterationCompilerTest extends CompilerTestBase { assertTrue(solutionDeltaUnion.isOnDynamicPath()); assertTrue(solutionDeltaUnion.getCostWeight() >= 1); - new NepheleJobGraphGenerator().compileJobGraph(op); + new JobGraphGenerator().compileJobGraph(op); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java index bbdad4a..0724a9f 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -72,9 +72,9 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(reduceNode, sinkNode.getInput().getSource()); // check DOP - assertEquals(1, sourceNode.getDegreeOfParallelism()); - assertEquals(1, reduceNode.getDegreeOfParallelism()); - assertEquals(1, sinkNode.getDegreeOfParallelism()); + assertEquals(1, sourceNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -122,10 +122,10 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(DriverStrategy.ALL_REDUCE, combineNode.getDriverStrategy()); // check DOP - assertEquals(8, sourceNode.getDegreeOfParallelism()); - assertEquals(8, combineNode.getDegreeOfParallelism()); - assertEquals(1, reduceNode.getDegreeOfParallelism()); - assertEquals(1, sinkNode.getDegreeOfParallelism()); + assertEquals(8, sourceNode.getParallelism()); + assertEquals(8, combineNode.getParallelism()); + assertEquals(1, reduceNode.getParallelism()); + assertEquals(1, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -180,10 +180,10 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, combineNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); @@ -244,13 +244,13 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); // check DOP - assertEquals(6, sourceNode.getDegreeOfParallelism()); - assertEquals(6, keyExtractor.getDegreeOfParallelism()); - assertEquals(6, combineNode.getDegreeOfParallelism()); + assertEquals(6, sourceNode.getParallelism()); + assertEquals(6, keyExtractor.getParallelism()); + assertEquals(6, combineNode.getParallelism()); - assertEquals(8, reduceNode.getDegreeOfParallelism()); - assertEquals(8, keyProjector.getDegreeOfParallelism()); - assertEquals(8, sinkNode.getDegreeOfParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, keyProjector.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); } catch (Exception e) { System.err.println(e.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java index 658cf7a..8720aa7 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java @@ -37,7 +37,7 @@ import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.util.Collector; import org.junit.Test; @@ -93,7 +93,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == ShipStrategyType.PARTITION_HASH) || (ss2 == ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) ); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -137,7 +137,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); @@ -182,7 +182,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase { assertEquals(1, joinWithSolutionSetNode.getOutgoingChannels().size()); assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy()); - new NepheleJobGraphGenerator().compileJobGraph(oPlan); + new JobGraphGenerator().compileJobGraph(oPlan); } catch (Exception e) { System.err.println(e.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java index 21d95ee..e7807c9 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; import org.apache.flink.configuration.Configuration; import org.junit.Test; @@ -42,8 +42,8 @@ public class CoGroupOnConflictingPartitioningsTest extends CompilerTestBase { DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); Configuration cfg = new Configuration(); - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH); - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE); input.coGroup(input).where(0).equalTo(0) .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()) http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java index 4cfa189..9171cc7 100644 --- a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.CompilerTestBase; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.Configuration; import org.junit.Test; @@ -41,8 +41,8 @@ public class JoinOnConflictingPartitioningsTest extends CompilerTestBase { DataSet<Tuple2<Long, Long>> input = env.fromElements(new Tuple2<Long, Long>(0L, 0L)); Configuration cfg = new Configuration(); - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH); - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE); input.join(input).where(0).equalTo(0) .withParameters(cfg) http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java index 79e04fb..019345f 100644 --- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java +++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java @@ -68,24 +68,24 @@ public class SpargelCompilerTest extends CompilerTestBase { // check the sink SinkPlanNode sink = op.getDataSinks().iterator().next(); assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, sink.getParallelism()); // check the iteration WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource(); - assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism()); // check the solution set join and the delta PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode(); assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta; - assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism()); assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy()); assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys()); // check the workset set join DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource(); - assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism()); assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy()); assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy()); assertTrue(edgeJoin.getInput1().getTempMode().isCached()); @@ -143,24 +143,24 @@ public class SpargelCompilerTest extends CompilerTestBase { // check the sink SinkPlanNode sink = op.getDataSinks().iterator().next(); assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, sink.getParallelism()); // check the iteration WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource(); - assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism()); // check the solution set join and the delta PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode(); assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta; - assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism()); assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy()); assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys()); // check the workset set join DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource(); - assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism()); + assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism()); assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy()); assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy()); assertTrue(edgeJoin.getInput1().getTempMode().isCached()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java index acf20d3..788327a 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java @@ -30,7 +30,7 @@ import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.PlanNode; @@ -57,9 +57,9 @@ public abstract class CompilerTestBase { protected DataStatistics dataStats; - protected PactCompiler withStatsCompiler; + protected Optimizer withStatsCompiler; - protected PactCompiler noStatsCompiler; + protected Optimizer noStatsCompiler; private int statCounter; @@ -68,10 +68,10 @@ public abstract class CompilerTestBase { @Before public void setup() { this.dataStats = new DataStatistics(); - this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator()); + this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator()); this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); - this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator()); + this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator()); this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); } @@ -113,7 +113,7 @@ public abstract class CompilerTestBase { HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>(); for (PlanNode n : p.getAllNodes()) { - Operator<?> c = n.getOriginalOptimizerNode().getPactContract(); + Operator<?> c = n.getOriginalOptimizerNode().getOperator(); String name = c.getName(); ArrayList<PlanNode> list = map.get(name); @@ -126,7 +126,7 @@ public abstract class CompilerTestBase { boolean shouldAdd = true; for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) { PlanNode in = iter.next(); - if (in.getOriginalOptimizerNode().getPactContract() == c) { + if (in.getOriginalOptimizerNode().getOperator() == c) { // is this the child or is our node the child if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) { SingleInputPlanNode thisNode = (SingleInputPlanNode) n; http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java index 7cd3ff0..67a4797 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java @@ -23,10 +23,10 @@ import akka.actor.ActorRef; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -72,14 +72,14 @@ public abstract class RecordAPITestBase extends AbstractTestBase { Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?"); } - PactCompiler pc = new PactCompiler(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics()); OptimizedPlan op = pc.compile(p); if (printPlan) { System.out.println(new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op)); } - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index db1fc4d..44f35e7 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -24,10 +24,10 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; import org.junit.Assert; @@ -49,7 +49,7 @@ public class TestEnvironment extends ExecutionEnvironment { try { OptimizedPlan op = compileProgram(jobName); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); JobGraph jobGraph = jgg.compileJobGraph(op); ActorRef client = this.executor.getJobClient(); @@ -80,7 +80,7 @@ public class TestEnvironment extends ExecutionEnvironment { private OptimizedPlan compileProgram(String jobName) { Plan p = createProgramPlan(jobName); - PactCompiler pc = new PactCompiler(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics()); return pc.compile(p); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java index 9ab62e1..400ed3a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java @@ -38,9 +38,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.Plan; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -140,9 +140,9 @@ public abstract class CancellingTestBase { } private JobGraph getJobGraph(final Plan plan) throws Exception { - final PactCompiler pc = new PactCompiler(new DataStatistics()); + final Optimizer pc = new Optimizer(new DataStatistics()); final OptimizedPlan op = pc.compile(plan); - final NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + final JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java index 1b73c4b..b38b784 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java @@ -27,7 +27,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.SourcePlanNode; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -130,7 +130,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase { // check the caches Assert.assertTrue(TempMode.CACHED == neighborsJoin.getInput2().getTempMode()); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); jgg.compileJobGraph(optPlan); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java index 2c23eaf..dcc9c15 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java @@ -40,7 +40,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.SourcePlanNode; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -152,7 +152,7 @@ public class ConnectedComponentsTest extends CompilerTestBase { Assert.assertTrue(TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() || LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy()); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); jgg.compileJobGraph(optPlan); } @@ -233,7 +233,7 @@ public class ConnectedComponentsTest extends CompilerTestBase { Assert.assertTrue(TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() || LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy()); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); jgg.compileJobGraph(optPlan); }