http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 0dd7e93..ff0e004 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -47,7 +47,7 @@ import 
org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
@@ -101,7 +101,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
        
                        // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
                } catch (Exception e) {
                        e.printStackTrace();
@@ -162,13 +162,13 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                        allSinks.add(out3Path);
                        
                        for (SinkPlanNode n : oPlan.getDataSinks()) {
-                               String path = ((FileDataSink) 
n.getSinkNode().getPactContract()).getFilePath();
+                               String path = ((FileDataSink) 
n.getSinkNode().getOperator()).getFilePath();
                                Assert.assertTrue("Invalid data sink.", 
allSinks.remove(path));
                        }
                        
                        // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
                        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
                } catch (Exception e) {
                        e.printStackTrace();
@@ -262,7 +262,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
                        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        
                        //Compile plan to verify that no error is thrown
                        jobGen.compileJobGraph(oPlan);
@@ -350,13 +350,13 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                        allSinks.add(out3Path);
                        
                        for (SinkPlanNode n : oPlan.getDataSinks()) {
-                               String path = ((FileDataSink) 
n.getSinkNode().getPactContract()).getFilePath();
+                               String path = ((FileDataSink) 
n.getSinkNode().getOperator()).getFilePath();
                                Assert.assertTrue("Invalid data sink.", 
allSinks.remove(path));
                        }
                        
                        // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
                        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
                } catch (Exception e) {
                        e.printStackTrace();
@@ -449,7 +449,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
                        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        
                        //Compile plan to verify that no error is thrown
                        jobGen.compileJobGraph(oPlan);
@@ -495,7 +495,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                                .input2(ma2)
                                .name("Match 2")
                                .build();
-                       mat2.setParameter(PactCompiler.HINT_LOCAL_STRATEGY, 
PactCompiler.HINT_LOCAL_STRATEGY_MERGE);
+                       mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_MERGE);
                        
                        FileDataSink sink = new FileDataSink(new 
DummyOutputFormat(), OUT_FILE, mat2);
                        
@@ -505,7 +505,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
                        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        
                        //Compile plan to verify that no error is thrown
                        jobGen.compileJobGraph(oPlan);
@@ -555,13 +555,13 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                        allSinks.add(out2Path);
                        
                        for (SinkPlanNode n : oPlan.getDataSinks()) {
-                               String path = ((FileDataSink) 
n.getSinkNode().getPactContract()).getFilePath();
+                               String path = ((FileDataSink) 
n.getSinkNode().getOperator()).getFilePath();
                                Assert.assertTrue("Invalid data sink.", 
allSinks.remove(path));
                        }
                        
                        // ---------- compile plan to nephele job graph to 
verify that no error is thrown ----------
                        
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
                } catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index c17ebe8..3e7da6c 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
@@ -51,7 +51,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
        public void testRightSide() {
                try {
                        
-                       Plan plan = 
getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+                       Plan plan = 
getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
        
@@ -63,7 +63,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
                        assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -79,7 +79,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
        public void testRightSideCountercheck() {
                try {
                        
-                       Plan plan = 
getTestPlanRightStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+                       Plan plan = 
getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
        
@@ -91,7 +91,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
                        assertEquals(TempMode.CACHED, 
innerJoin.getInput2().getTempMode());
                
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -108,7 +108,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
        public void testLeftSide() {
                try {
                        
-                       Plan plan = 
getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
+                       Plan plan = 
getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
        
@@ -120,7 +120,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
                        assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -136,7 +136,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
        public void testLeftSideCountercheck() {
                try {
                        
-                       Plan plan = 
getTestPlanLeftStatic(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+                       Plan plan = 
getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
                        
                        OptimizedPlan oPlan = compileNoStats(plan);
        
@@ -148,7 +148,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        assertEquals(TempMode.CACHED, 
innerJoin.getInput1().getTempMode());
                        assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -191,7 +191,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
                        assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -212,10 +212,10 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                IterativeDataSet<Tuple3<Long, Long, Long>> iteration = 
bigInput.iterate(10);
                
                Configuration joinStrategy = new Configuration();
-               joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+               joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
                
                if(strategy != "") {
-                       
joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
+                       joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, 
strategy);
                }
                
                DataSet<Tuple3<Long, Long, Long>> inner = 
iteration.join(smallInput).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
@@ -243,7 +243,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                IterativeDataSet<Tuple3<Long, Long, Long>> iteration = 
bigInput.iterate(10);
                
                Configuration joinStrategy = new Configuration();
-               joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, 
strategy);
+               joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
                
                DataSet<Tuple3<Long, Long, Long>> inner = 
smallInput.join(iteration).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
index 5265e3a..565d992 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -59,9 +59,9 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
        
        protected transient DataStatistics dataStats;
        
-       protected transient PactCompiler withStatsCompiler;
+       protected transient Optimizer withStatsCompiler;
        
-       protected transient PactCompiler noStatsCompiler;
+       protected transient Optimizer noStatsCompiler;
        
        private transient int statCounter;
        
@@ -70,10 +70,10 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
        @Before
        public void setup() {
                this.dataStats = new DataStatistics();
-               this.withStatsCompiler = new PactCompiler(this.dataStats, new 
DefaultCostEstimator());
+               this.withStatsCompiler = new Optimizer(this.dataStats, new 
DefaultCostEstimator());
                
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
                
-               this.noStatsCompiler = new PactCompiler(null, new 
DefaultCostEstimator());
+               this.noStatsCompiler = new Optimizer(null, new 
DefaultCostEstimator());
                
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
        }
        
@@ -111,7 +111,7 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
                        HashMap<String, ArrayList<PlanNode>> map = new 
HashMap<String, ArrayList<PlanNode>>();
                        
                        for (PlanNode n : p.getAllNodes()) {
-                               Operator<?> c = 
n.getOriginalOptimizerNode().getPactContract();
+                               Operator<?> c = 
n.getOriginalOptimizerNode().getOperator();
                                String name = c.getName();
                                
                                ArrayList<PlanNode> list = map.get(name);
@@ -124,7 +124,7 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
                                boolean shouldAdd = true;
                                for (Iterator<PlanNode> iter = list.iterator(); 
iter.hasNext();) {
                                        PlanNode in = iter.next();
-                                       if 
(in.getOriginalOptimizerNode().getPactContract() == c) {
+                                       if 
(in.getOriginalOptimizerNode().getOperator() == c) {
                                                // is this the child or is our 
node the child
                                                if (in instanceof 
SingleInputPlanNode && n instanceof SingleInputPlanNode) {
                                                        SingleInputPlanNode 
thisNode = (SingleInputPlanNode) n;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
index 7fa331a..b17e777 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyMatchStub;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
@@ -317,7 +317,7 @@ public class DOPChangeTest extends CompilerTestBase {
                
                OptimizedPlan oPlan = compileNoStats(plan);
                
-               NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+               JobGraphGenerator jobGen = new JobGraphGenerator();
                
                //Compile plan to verify that no error is thrown
                jobGen.compileJobGraph(oPlan);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index b8809d7..aaee975 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -41,7 +41,7 @@ public class DisjointDataFlowsTest extends CompilerTestBase {
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index d97f855..34aa9f8 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -78,10 +78,10 @@ public class DistinctCompilationTest extends 
CompilerTestBase implements java.io
                        assertEquals(new FieldList(0, 1), 
reduceNode.getInput().getLocalStrategyKeys());
 
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -136,13 +136,13 @@ public class DistinctCompilationTest extends 
CompilerTestBase implements java.io
                        assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
 
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, keyExtractor.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
 
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, keyProjector.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -192,10 +192,10 @@ public class DistinctCompilationTest extends 
CompilerTestBase implements java.io
                        assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
 
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 146c085..6dadc19 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyCrossStub;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
@@ -74,7 +74,7 @@ public class HardPlansCompilationTest extends 
CompilerTestBase {
                plan.setDefaultParallelism(DEFAULT_PARALLELISM);
                
                OptimizedPlan oPlan = compileNoStats(plan);
-               NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+               JobGraphGenerator jobGen = new JobGraphGenerator();
                jobGen.compileJobGraph(oPlan);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index dc6fcad..ac4f820 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -42,7 +42,7 @@ import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -79,7 +79,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
                        new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(p);
                        
                        // check that the JobGraphGenerator accepts the plan
-                       new NepheleJobGraphGenerator().compileJobGraph(p);
+                       new JobGraphGenerator().compileJobGraph(p);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -116,7 +116,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
wipn.getInput1().getShipStrategy());
                        
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -152,7 +152,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
wipn.getInput1().getShipStrategy());
                        
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -188,7 +188,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                        assertEquals(ShipStrategyType.FORWARD, 
wipn.getInput1().getShipStrategy());
                        
assertTrue(wipn.getInput2().getTempMode().breaksPipeline());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -222,7 +222,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                                assertEquals(ShipStrategyType.PARTITION_HASH, 
c.getShipStrategy());
                        }
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index d6852f5..e65758f 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
@@ -133,7 +133,7 @@ public class NestedIterationsTest extends CompilerTestBase {
                        OptimizedPlan op = compileNoStats(p);
                        
                        // job graph generator should be able to translate this
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -171,7 +171,7 @@ public class NestedIterationsTest extends CompilerTestBase {
                        OptimizedPlan op = compileNoStats(p);
                        
                        // job graph generator should be able to translate this
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index e5983d9..86f01b0 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -149,7 +149,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                DataSet<Long> initialSource = 
env.generateSequence(1, 10);
                                
                                Configuration conf= new Configuration();
-                               
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, 
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
@@ -171,7 +171,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                DataSet<Long> initialSource = 
env.generateSequence(1, 10);
                                
                                Configuration conf= new Configuration();
-                               
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, 
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
@@ -194,7 +194,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                DataSet<Long> initialSource = 
env.generateSequence(1, 10);
                                
                                Configuration conf= new Configuration();
-                               
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, 
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
@@ -217,7 +217,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                DataSet<Long> initialSource = 
env.generateSequence(1, 10);
                                
                                Configuration conf= new Configuration();
-                               
conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, 
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 23fa311..f6885c5 100644
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;
@@ -51,7 +51,7 @@ public class ReduceAllTest extends CompilerTestBase {
                
                try {
                        OptimizedPlan oPlan = compileNoStats(plan);
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
                } catch(CompilerException ce) {
                        ce.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 8356e94..1fe16bb 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -63,7 +63,7 @@ public class SemanticPropertiesAPIToPlanTest extends 
CompilerTestBase {
                oPlan.accept(new Visitor<PlanNode>() {
                        @Override
                        public boolean preVisit(PlanNode visitable) {
-                               if (visitable instanceof SingleInputPlanNode && 
visitable.getPactContract() instanceof ReduceOperatorBase) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof ReduceOperatorBase) {
                                        for (Channel input: 
visitable.getInputs()) {
                                                GlobalProperties gprops = 
visitable.getGlobalProperties();
                                                LocalProperties lprops = 
visitable.getLocalProperties();
@@ -78,7 +78,7 @@ public class SemanticPropertiesAPIToPlanTest extends 
CompilerTestBase {
                                                                
lprops.getGroupedFields().contains(1));
                                        }
                                }
-                               if (visitable instanceof SingleInputPlanNode && 
visitable.getPactContract() instanceof MapOperatorBase) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof MapOperatorBase) {
                                        for (Channel input: 
visitable.getInputs()) {
                                                GlobalProperties gprops = 
visitable.getGlobalProperties();
                                                LocalProperties lprops = 
visitable.getLocalProperties();
@@ -124,7 +124,7 @@ public class SemanticPropertiesAPIToPlanTest extends 
CompilerTestBase {
                oPlan.accept(new Visitor<PlanNode>() {
                        @Override
                        public boolean preVisit(PlanNode visitable) {
-                               if (visitable instanceof DualInputPlanNode && 
visitable.getPactContract() instanceof JoinOperatorBase) {
+                               if (visitable instanceof DualInputPlanNode && 
visitable.getProgramOperator() instanceof JoinOperatorBase) {
                                        DualInputPlanNode node = 
((DualInputPlanNode) visitable);
 
                                        final Channel inConn1 = 
node.getInput1();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 6647483..92b4fc5 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -80,7 +80,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends 
CompilerTestBase {
                        assertEquals(0.5, 
mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
                        assertEquals(0.0, 
mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -133,7 +133,7 @@ public class UnionBetweenDynamicAndStaticPathTest extends 
CompilerTestBase {
                                assertTrue(c.isOnDynamicPath());
                        }
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index cb4bce4..5d15ed8 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;
@@ -78,7 +78,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                
                OptimizedPlan oPlan = compileNoStats(plan);
                
-               NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+               JobGraphGenerator jobGen = new JobGraphGenerator();
                
                // Compile plan to verify that no error is thrown
                jobGen.compileJobGraph(oPlan);
@@ -87,7 +87,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                        
                        @Override
                        public boolean preVisit(PlanNode visitable) {
-                               if (visitable instanceof SingleInputPlanNode && 
visitable.getPactContract() instanceof ReduceOperator) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof ReduceOperator) {
                                        for (Channel inConn : 
visitable.getInputs()) {
                                                Assert.assertTrue("Reduce 
should just forward the input if it is already partitioned",
                                                                
inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
@@ -126,7 +126,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                // return the plan
                Plan plan = env.createProgramPlan("Test union on new java-api");
                OptimizedPlan oPlan = compileNoStats(plan);
-               NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+               JobGraphGenerator jobGen = new JobGraphGenerator();
                
                // Compile plan to verify that no error is thrown
                jobGen.compileJobGraph(oPlan);
@@ -139,7 +139,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                                /* Test on the union output connections
                                 * It must be under the GroupOperator and the 
strategy should be forward
                                 */
-                               if (visitable instanceof SingleInputPlanNode && 
visitable.getPactContract() instanceof GroupReduceOperatorBase){
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof GroupReduceOperatorBase){
                                        final Channel inConn = 
((SingleInputPlanNode) visitable).getInput();
                                        Assert.assertTrue("Union should just 
forward the Partitioning",
                                                        
inConn.getShipStrategy() == ShipStrategyType.FORWARD ); 
@@ -156,7 +156,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                                                final Channel inConn = 
inputs.next();
                                                PlanNode inNode = 
inConn.getSource();
                                                Assert.assertTrue("Input of 
Union should be FlatMapOperators",
-                                                               
inNode.getPactContract() instanceof FlatMapOperatorBase);
+                                                               
inNode.getProgramOperator() instanceof FlatMapOperatorBase);
                                                Assert.assertTrue("Shipment 
strategy under union should partition the data",
                                                                
inConn.getShipStrategy() == ShipStrategyType.PARTITION_HASH); 
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index f327259..1e4124c 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
@@ -44,7 +44,7 @@ public class UnionReplacementTest extends CompilerTestBase {
        
                        Plan plan = env.createProgramPlan();
                        OptimizedPlan oPlan = compileNoStats(plan);
-                       NepheleJobGraphGenerator jobGen = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
                        jobGen.compileJobGraph(oPlan);
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index d10803e..80c0bda 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.junit.Test;
 
 @SuppressWarnings("serial")
@@ -52,7 +52,7 @@ public class WorksetIterationCornerCasesTest extends 
CompilerTestBase {
                        WorksetIterationPlanNode wipn = 
(WorksetIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
                        
assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty());
                        
-                       NepheleJobGraphGenerator jgg = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jgg = new JobGraphGenerator();
                        jgg.compileJobGraph(op);
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index f17842e..6e7c0a3 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyMatchStub;
 import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
@@ -106,7 +106,7 @@ public class WorksetIterationsRecordApiCompilerTest extends 
CompilerTestBase {
                assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == 
ShipStrategyType.PARTITION_HASH) ||
                                        (ss2 == ShipStrategyType.FORWARD && ss1 
== ShipStrategyType.PARTITION_HASH) );
                
-               new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+               new JobGraphGenerator().compileJobGraph(oPlan);
        }
        
        @Test
@@ -150,7 +150,7 @@ public class WorksetIterationsRecordApiCompilerTest extends 
CompilerTestBase {
                assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
                assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
                
-               new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+               new JobGraphGenerator().compileJobGraph(oPlan);
        }
        
        @Test
@@ -193,7 +193,7 @@ public class WorksetIterationsRecordApiCompilerTest extends 
CompilerTestBase {
                assertEquals(1, 
joinWithSolutionSetNode.getOutgoingChannels().size());
                assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
                
-               new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+               new JobGraphGenerator().compileJobGraph(oPlan);
        }
        
        private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, 
boolean mapBeforeSolutionDelta) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index e876fbb..0273659 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -75,7 +75,7 @@ public class BinaryCustomPartitioningCompatibilityTest 
extends CompilerTestBase
                        assertEquals(partitioner, 
partitioner1.getInput().getPartitioner());
                        assertEquals(partitioner, 
partitioner2.getInput().getPartitioner());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -120,7 +120,7 @@ public class BinaryCustomPartitioningCompatibilityTest 
extends CompilerTestBase
                        assertEquals(partitioner, 
partitioner1.getInput().getPartitioner());
                        assertEquals(partitioner, 
partitioner2.getInput().getPartitioner());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index f6d8d0e..d397ea2 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -64,17 +64,17 @@ public class CustomPartitioningTest extends 
CompilerTestBase {
                        SingleInputPlanNode balancer = (SingleInputPlanNode) 
partitioner.getInput().getSource();
                        
                        assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
sink.getDegreeOfParallelism());
+                       assertEquals(parallelism, sink.getParallelism());
                        
                        assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
mapper.getDegreeOfParallelism());
+                       assertEquals(parallelism, mapper.getParallelism());
                        
                        assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
                        assertEquals(part, 
partitioner.getInput().getPartitioner());
-                       assertEquals(parallelism, 
partitioner.getDegreeOfParallelism());
+                       assertEquals(parallelism, partitioner.getParallelism());
                        
                        
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
balancer.getDegreeOfParallelism());
+                       assertEquals(parallelism, balancer.getParallelism());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -134,17 +134,17 @@ public class CustomPartitioningTest extends 
CompilerTestBase {
                        SingleInputPlanNode balancer = (SingleInputPlanNode) 
partitioner.getInput().getSource();
                        
                        assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
sink.getDegreeOfParallelism());
+                       assertEquals(parallelism, sink.getParallelism());
                        
                        assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
mapper.getDegreeOfParallelism());
+                       assertEquals(parallelism, mapper.getParallelism());
                        
                        assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
                        assertEquals(part, 
partitioner.getInput().getPartitioner());
-                       assertEquals(parallelism, 
partitioner.getDegreeOfParallelism());
+                       assertEquals(parallelism, partitioner.getParallelism());
                        
                        
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
balancer.getDegreeOfParallelism());
+                       assertEquals(parallelism, balancer.getParallelism());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -206,23 +206,23 @@ public class CustomPartitioningTest extends 
CompilerTestBase {
                        SingleInputPlanNode balancer = (SingleInputPlanNode) 
keyExtractor.getInput().getSource();
                        
                        assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
sink.getDegreeOfParallelism());
+                       assertEquals(parallelism, sink.getParallelism());
                        
                        assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
mapper.getDegreeOfParallelism());
+                       assertEquals(parallelism, mapper.getParallelism());
                        
                        assertEquals(ShipStrategyType.FORWARD, 
keyRemover.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
keyRemover.getDegreeOfParallelism());
+                       assertEquals(parallelism, keyRemover.getParallelism());
                        
                        assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
                        assertEquals(part, 
partitioner.getInput().getPartitioner());
-                       assertEquals(parallelism, 
partitioner.getDegreeOfParallelism());
+                       assertEquals(parallelism, partitioner.getParallelism());
                        
                        assertEquals(ShipStrategyType.FORWARD, 
keyExtractor.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
keyExtractor.getDegreeOfParallelism());
+                       assertEquals(parallelism, 
keyExtractor.getParallelism());
                        
                        
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
balancer.getDegreeOfParallelism());
+                       assertEquals(parallelism, balancer.getParallelism());
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
index 2c5d235..cb4bd78 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
@@ -246,7 +246,7 @@ public class DataExchangeModeClosedBranchingTest extends 
CompilerTestBase {
 
        private SinkPlanNode findSink(Collection<SinkPlanNode> collection, 
String name) {
                for (SinkPlanNode node : collection) {
-                       String nodeName = 
node.getOptimizerNode().getPactContract().getName();
+                       String nodeName = 
node.getOptimizerNode().getOperator().getName();
                        if (nodeName != null && nodeName.equals(name)) {
                                return node;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
index 6c0e88b..6b2691a 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -171,7 +171,7 @@ public class DataExchangeModeOpenBranchingTest extends 
CompilerTestBase {
 
        private SinkPlanNode findSink(Collection<SinkPlanNode> collection, 
String name) {
                for (SinkPlanNode node : collection) {
-                       String nodeName = 
node.getOptimizerNode().getPactContract().getName();
+                       String nodeName = 
node.getOptimizerNode().getOperator().getName();
                        if (nodeName != null && nodeName.equals(name)) {
                                return node;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
index dae3c41..fe33635 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/PipelineBreakingTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.dag.OptimizerNode;
 import org.apache.flink.optimizer.dag.SingleInputNode;
@@ -292,8 +292,8 @@ public class PipelineBreakingTest {
        }
 
        private static List<DataSinkNode> convertPlan(Plan p) {
-               PactCompiler.GraphCreatingVisitor dagCreator =
-                               new PactCompiler.GraphCreatingVisitor(17, 
p.getExecutionConfig().getExecutionMode());
+               Optimizer.GraphCreatingVisitor dagCreator =
+                               new Optimizer.GraphCreatingVisitor(17, 
p.getExecutionConfig().getExecutionMode());
 
                // create the DAG
                p.accept(dagCreator);
@@ -312,8 +312,8 @@ public class PipelineBreakingTest {
                                rootNode = new SinkJoiner(rootNode, 
iter.next());
                        }
                }
-               rootNode.accept(new PactCompiler.IdAndEstimatesVisitor(null));
-               rootNode.accept(new PactCompiler.BranchesVisitor());
+               rootNode.accept(new Optimizer.IdAndEstimatesVisitor(null));
+               rootNode.accept(new Optimizer.BranchesVisitor());
 
                return sinks;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 32aeab9..a683968 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -73,9 +73,9 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        assertEquals(DriverStrategy.ALL_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
                        
                        // check DOP
-                       assertEquals(1, sourceNode.getDegreeOfParallelism());
-                       assertEquals(1, reduceNode.getDegreeOfParallelism());
-                       assertEquals(1, sinkNode.getDegreeOfParallelism());
+                       assertEquals(1, sourceNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -121,10 +121,10 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, 
combineNode.getDriverStrategy());
                        
                        // check DOP
-                       assertEquals(8, sourceNode.getDegreeOfParallelism());
-                       assertEquals(8, combineNode.getDegreeOfParallelism());
-                       assertEquals(1, reduceNode.getDegreeOfParallelism());
-                       assertEquals(1, sinkNode.getDegreeOfParallelism());
+                       assertEquals(8, sourceNode.getParallelism());
+                       assertEquals(8, combineNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -172,9 +172,9 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
                        
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -229,10 +229,10 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
                        
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -285,12 +285,12 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
                        
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, keyExtractor.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
                        
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, keyProjector.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -351,13 +351,13 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
                        
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, keyExtractor.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
                        
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, keyProjector.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 54b24dd..37a8e81 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.junit.Test;
 
@@ -53,7 +53,7 @@ public class IterationCompilerTest extends CompilerTestBase {
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -81,7 +81,7 @@ public class IterationCompilerTest extends CompilerTestBase {
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -120,7 +120,7 @@ public class IterationCompilerTest extends CompilerTestBase 
{
                        assertTrue(union.getCostWeight() >= 1);
                        
                        // see that the jobgraph generator can translate this
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -179,7 +179,7 @@ public class IterationCompilerTest extends CompilerTestBase 
{
                        assertTrue(solutionDeltaUnion.isOnDynamicPath());
                        assertTrue(solutionDeltaUnion.getCostWeight() >= 1);
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(op);
+                       new JobGraphGenerator().compileJobGraph(op);
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index bbdad4a..0724a9f 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -72,9 +72,9 @@ public class ReduceCompilationTest extends CompilerTestBase 
implements java.io.S
                        assertEquals(reduceNode, 
sinkNode.getInput().getSource());
                        
                        // check DOP
-                       assertEquals(1, sourceNode.getDegreeOfParallelism());
-                       assertEquals(1, reduceNode.getDegreeOfParallelism());
-                       assertEquals(1, sinkNode.getDegreeOfParallelism());
+                       assertEquals(1, sourceNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -122,10 +122,10 @@ public class ReduceCompilationTest extends 
CompilerTestBase implements java.io.S
                        assertEquals(DriverStrategy.ALL_REDUCE, 
combineNode.getDriverStrategy());
                        
                        // check DOP
-                       assertEquals(8, sourceNode.getDegreeOfParallelism());
-                       assertEquals(8, combineNode.getDegreeOfParallelism());
-                       assertEquals(1, reduceNode.getDegreeOfParallelism());
-                       assertEquals(1, sinkNode.getDegreeOfParallelism());
+                       assertEquals(8, sourceNode.getParallelism());
+                       assertEquals(8, combineNode.getParallelism());
+                       assertEquals(1, reduceNode.getParallelism());
+                       assertEquals(1, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -180,10 +180,10 @@ public class ReduceCompilationTest extends 
CompilerTestBase implements java.io.S
                        assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
                        
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -244,13 +244,13 @@ public class ReduceCompilationTest extends 
CompilerTestBase implements java.io.S
                        assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
                        
                        // check DOP
-                       assertEquals(6, sourceNode.getDegreeOfParallelism());
-                       assertEquals(6, keyExtractor.getDegreeOfParallelism());
-                       assertEquals(6, combineNode.getDegreeOfParallelism());
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
                        
-                       assertEquals(8, reduceNode.getDegreeOfParallelism());
-                       assertEquals(8, keyProjector.getDegreeOfParallelism());
-                       assertEquals(8, sinkNode.getDegreeOfParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index 658cf7a..8720aa7 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -93,7 +93,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == 
ShipStrategyType.PARTITION_HASH) ||
                                                (ss2 == 
ShipStrategyType.FORWARD && ss1 == ShipStrategyType.PARTITION_HASH) );
                
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -137,7 +137,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -182,7 +182,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        assertEquals(1, 
joinWithSolutionSetNode.getOutgoingChannels().size());
                        assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(oPlan);
+                       new JobGraphGenerator().compileJobGraph(oPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index 21d95ee..e7807c9 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
@@ -42,8 +42,8 @@ public class CoGroupOnConflictingPartitioningsTest extends 
CompilerTestBase {
                        DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
                        
                        Configuration cfg = new Configuration();
-                       
cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-                       
cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+                       cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+                       
cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
                        
                        input.coGroup(input).where(0).equalTo(0)
                                .with(new DummyCoGroupFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>>())

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 4cfa189..9171cc7 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
@@ -41,8 +41,8 @@ public class JoinOnConflictingPartitioningsTest extends 
CompilerTestBase {
                        DataSet<Tuple2<Long, Long>> input = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
                        
                        Configuration cfg = new Configuration();
-                       
cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-                       
cfg.setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
+                       cfg.setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
+                       
cfg.setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE);
                        
                        input.join(input).where(0).equalTo(0)
                                .withParameters(cfg)

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
 
b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index 79e04fb..019345f 100644
--- 
a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ 
b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -68,24 +68,24 @@ public class SpargelCompilerTest extends CompilerTestBase {
                        // check the sink
                        SinkPlanNode sink = op.getDataSinks().iterator().next();
                        assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(DEFAULT_PARALLELISM, 
sink.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
sink.getParallelism());
                        
                        // check the iteration
                        WorksetIterationPlanNode iteration = 
(WorksetIterationPlanNode) sink.getInput().getSource();
-                       assertEquals(DEFAULT_PARALLELISM, 
iteration.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
iteration.getParallelism());
                        
                        // check the solution set join and the delta
                        PlanNode ssDelta = 
iteration.getSolutionSetDeltaPlanNode();
                        assertTrue(ssDelta instanceof DualInputPlanNode); // 
this is only true if the update functions preserves the partitioning
                        
                        DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-                       assertEquals(DEFAULT_PARALLELISM, 
ssJoin.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
ssJoin.getParallelism());
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
ssJoin.getInput1().getShipStrategy());
                        assertEquals(new FieldList(0), 
ssJoin.getInput1().getShipStrategyKeys());
                        
                        // check the workset set join
                        DualInputPlanNode edgeJoin = (DualInputPlanNode) 
ssJoin.getInput1().getSource();
-                       assertEquals(DEFAULT_PARALLELISM, 
edgeJoin.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
edgeJoin.getParallelism());
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
edgeJoin.getInput1().getShipStrategy());
                        assertEquals(ShipStrategyType.FORWARD, 
edgeJoin.getInput2().getShipStrategy());
                        
assertTrue(edgeJoin.getInput1().getTempMode().isCached());
@@ -143,24 +143,24 @@ public class SpargelCompilerTest extends CompilerTestBase 
{
                        // check the sink
                        SinkPlanNode sink = op.getDataSinks().iterator().next();
                        assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(DEFAULT_PARALLELISM, 
sink.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
sink.getParallelism());
                        
                        // check the iteration
                        WorksetIterationPlanNode iteration = 
(WorksetIterationPlanNode) sink.getInput().getSource();
-                       assertEquals(DEFAULT_PARALLELISM, 
iteration.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
iteration.getParallelism());
                        
                        // check the solution set join and the delta
                        PlanNode ssDelta = 
iteration.getSolutionSetDeltaPlanNode();
                        assertTrue(ssDelta instanceof DualInputPlanNode); // 
this is only true if the update functions preserves the partitioning
                        
                        DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-                       assertEquals(DEFAULT_PARALLELISM, 
ssJoin.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
ssJoin.getParallelism());
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
ssJoin.getInput1().getShipStrategy());
                        assertEquals(new FieldList(0), 
ssJoin.getInput1().getShipStrategyKeys());
                        
                        // check the workset set join
                        DualInputPlanNode edgeJoin = (DualInputPlanNode) 
ssJoin.getInput1().getSource();
-                       assertEquals(DEFAULT_PARALLELISM, 
edgeJoin.getDegreeOfParallelism());
+                       assertEquals(DEFAULT_PARALLELISM, 
edgeJoin.getParallelism());
                        assertEquals(ShipStrategyType.PARTITION_HASH, 
edgeJoin.getInput1().getShipStrategy());
                        assertEquals(ShipStrategyType.FORWARD, 
edgeJoin.getInput2().getShipStrategy());
                        
assertTrue(edgeJoin.getInput1().getTempMode().isCached());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index acf20d3..788327a 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
@@ -57,9 +57,9 @@ public abstract class CompilerTestBase {
        
        protected DataStatistics dataStats;
        
-       protected PactCompiler withStatsCompiler;
+       protected Optimizer withStatsCompiler;
        
-       protected PactCompiler noStatsCompiler;
+       protected Optimizer noStatsCompiler;
        
        private int statCounter;
        
@@ -68,10 +68,10 @@ public abstract class CompilerTestBase {
        @Before
        public void setup() {
                this.dataStats = new DataStatistics();
-               this.withStatsCompiler = new PactCompiler(this.dataStats, new 
DefaultCostEstimator());
+               this.withStatsCompiler = new Optimizer(this.dataStats, new 
DefaultCostEstimator());
                
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
                
-               this.noStatsCompiler = new PactCompiler(null, new 
DefaultCostEstimator());
+               this.noStatsCompiler = new Optimizer(null, new 
DefaultCostEstimator());
                
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
        }
        
@@ -113,7 +113,7 @@ public abstract class CompilerTestBase {
                        HashMap<String, ArrayList<PlanNode>> map = new 
HashMap<String, ArrayList<PlanNode>>();
                        
                        for (PlanNode n : p.getAllNodes()) {
-                               Operator<?> c = 
n.getOriginalOptimizerNode().getPactContract();
+                               Operator<?> c = 
n.getOriginalOptimizerNode().getOperator();
                                String name = c.getName();
                                
                                ArrayList<PlanNode> list = map.get(name);
@@ -126,7 +126,7 @@ public abstract class CompilerTestBase {
                                boolean shouldAdd = true;
                                for (Iterator<PlanNode> iter = list.iterator(); 
iter.hasNext();) {
                                        PlanNode in = iter.next();
-                                       if 
(in.getOriginalOptimizerNode().getPactContract() == c) {
+                                       if 
(in.getOriginalOptimizerNode().getOperator() == c) {
                                                // is this the child or is our 
node the child
                                                if (in instanceof 
SingleInputPlanNode && n instanceof SingleInputPlanNode) {
                                                        SingleInputPlanNode 
thisNode = (SingleInputPlanNode) n;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 7cd3ff0..67a4797 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -23,10 +23,10 @@ import akka.actor.ActorRef;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -72,14 +72,14 @@ public abstract class RecordAPITestBase extends 
AbstractTestBase {
                        Assert.fail("Error: Cannot obtain Pact plan. Did the 
thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
                }
                
-               PactCompiler pc = new PactCompiler(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics());
                OptimizedPlan op = pc.compile(p);
                
                if (printPlan) {
                        System.out.println(new 
PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op)); 
                }
 
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index db1fc4d..44f35e7 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.junit.Assert;
@@ -49,7 +49,7 @@ public class TestEnvironment extends ExecutionEnvironment {
                try {
                        OptimizedPlan op = compileProgram(jobName);
 
-                       NepheleJobGraphGenerator jgg = new 
NepheleJobGraphGenerator();
+                       JobGraphGenerator jgg = new JobGraphGenerator();
                        JobGraph jobGraph = jgg.compileJobGraph(op);
 
                        ActorRef client = this.executor.getJobClient();
@@ -80,7 +80,7 @@ public class TestEnvironment extends ExecutionEnvironment {
        private OptimizedPlan compileProgram(String jobName) {
                Plan p = createProgramPlan(jobName);
 
-               PactCompiler pc = new PactCompiler(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics());
                return pc.compile(p);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 9ab62e1..400ed3a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -38,9 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
@@ -140,9 +140,9 @@ public abstract class CancellingTestBase {
        }
 
        private JobGraph getJobGraph(final Plan plan) throws Exception {
-               final PactCompiler pc = new PactCompiler(new DataStatistics());
+               final Optimizer pc = new Optimizer(new DataStatistics());
                final OptimizedPlan op = pc.compile(plan);
-               final NepheleJobGraphGenerator jgg = new 
NepheleJobGraphGenerator();
+               final JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
index 1b73c4b..b38b784 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsCoGroupTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -130,7 +130,7 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
                // check the caches
                Assert.assertTrue(TempMode.CACHED == 
neighborsJoin.getInput2().getTempMode());
                
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                jgg.compileJobGraph(optPlan);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
index 2c23eaf..dcc9c15 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -152,7 +152,7 @@ public class ConnectedComponentsTest extends 
CompilerTestBase {
                Assert.assertTrue(TempMode.PIPELINE_BREAKER == 
iter.getInitialWorksetInput().getTempMode() ||
                                                        LocalStrategy.SORT == 
iter.getInitialWorksetInput().getLocalStrategy());
                
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                jgg.compileJobGraph(optPlan);
        }
        
@@ -233,7 +233,7 @@ public class ConnectedComponentsTest extends 
CompilerTestBase {
                Assert.assertTrue(TempMode.PIPELINE_BREAKER == 
iter.getInitialWorksetInput().getTempMode() ||
                                                        LocalStrategy.SORT == 
iter.getInitialWorksetInput().getLocalStrategy());
                
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                jgg.compileJobGraph(optPlan);
        }
        

Reply via email to