http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
index 85f1aa7..e4b35b7 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.api.common.operators.SemanticProperties;
 import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.costs.Costs;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -105,8 +105,8 @@ public class DataSourceNode extends OptimizerNode {
         * @return The contract.
         */
        @Override
-       public GenericDataSourceBase<?, ?> getPactContract() {
-               return (GenericDataSourceBase<?, ?>) super.getPactContract();
+       public GenericDataSourceBase<?, ?> getOperator() {
+               return (GenericDataSourceBase<?, ?>) super.getOperator();
        }
 
        @Override
@@ -123,8 +123,8 @@ public class DataSourceNode extends OptimizerNode {
        }
 
        @Override
-       public List<PactConnection> getIncomingConnections() {
-               return Collections.<PactConnection>emptyList();
+       public List<DagConnection> getIncomingConnections() {
+               return Collections.<DagConnection>emptyList();
        }
 
        @Override
@@ -139,13 +139,13 @@ public class DataSourceNode extends OptimizerNode {
                        String inFormatDescription = "<unknown>";
                        
                        try {
-                               format = 
getPactContract().getFormatWrapper().getUserCodeObject();
-                               Configuration config = 
getPactContract().getParameters();
+                               format = 
getOperator().getFormatWrapper().getUserCodeObject();
+                               Configuration config = 
getOperator().getParameters();
                                format.configure(config);
                        }
                        catch (Throwable t) {
-                               if (PactCompiler.LOG.isWarnEnabled()) {
-                                       PactCompiler.LOG.warn("Could not 
instantiate InputFormat to obtain statistics."
+                               if (Optimizer.LOG.isWarnEnabled()) {
+                                       Optimizer.LOG.warn("Could not 
instantiate InputFormat to obtain statistics."
                                                + " Limited statistics will be 
available.", t);
                                }
                                return;
@@ -158,7 +158,7 @@ public class DataSourceNode extends OptimizerNode {
                        }
                        
                        // first of all, get the statistics from the cache
-                       final String statisticsKey = 
getPactContract().getStatisticsKey();
+                       final String statisticsKey = 
getOperator().getStatisticsKey();
                        final BaseStatistics cachedStatistics = 
statistics.getBaseStatistics(statisticsKey);
                        
                        BaseStatistics bs = null;
@@ -166,16 +166,16 @@ public class DataSourceNode extends OptimizerNode {
                                bs = format.getStatistics(cachedStatistics);
                        }
                        catch (Throwable t) {
-                               if (PactCompiler.LOG.isWarnEnabled()) {
-                                       PactCompiler.LOG.warn("Error obtaining 
statistics from input format: " + t.getMessage(), t);
+                               if (Optimizer.LOG.isWarnEnabled()) {
+                                       Optimizer.LOG.warn("Error obtaining 
statistics from input format: " + t.getMessage(), t);
                                }
                        }
                        
                        if (bs != null) {
                                final long len = bs.getTotalInputSize();
                                if (len == BaseStatistics.SIZE_UNKNOWN) {
-                                       if (PactCompiler.LOG.isInfoEnabled()) {
-                                               PactCompiler.LOG.info("Compiler 
could not determine the size of input '" + inFormatDescription + "'. Using 
default estimates.");
+                                       if (Optimizer.LOG.isInfoEnabled()) {
+                                               Optimizer.LOG.info("Compiler 
could not determine the size of input '" + inFormatDescription + "'. Using 
default estimates.");
                                        }
                                }
                                else if (len >= 0) {
@@ -207,14 +207,14 @@ public class DataSourceNode extends OptimizerNode {
                        return this.cachedPlans;
                }
 
-               SourcePlanNode candidate = new SourcePlanNode(this, "DataSource 
("+this.getPactContract().getName()+")",
+               SourcePlanNode candidate = new SourcePlanNode(this, "DataSource 
("+this.getOperator().getName()+")",
                                this.gprops, this.lprops);
 
                if(!replicatedInput) {
                        
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
 
                        final Costs costs = new Costs();
-                       if 
(FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass())
 &&
+                       if 
(FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass())
 &&
                                        this.estimatedOutputSize >= 0) {
                                
estimator.addFileInputCost(this.estimatedOutputSize, costs);
                        }
@@ -223,10 +223,10 @@ public class DataSourceNode extends OptimizerNode {
                        // replicated input
                        final Costs costs = new Costs();
                        InputFormat<?,?> inputFormat =
-                                       
((ReplicatingInputFormat<?,?>)getPactContract().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
+                                       ((ReplicatingInputFormat<?,?>) 
getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
                        if 
(FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
                                        this.estimatedOutputSize >= 0) {
-                               
estimator.addFileInputCost(this.estimatedOutputSize * 
this.getDegreeOfParallelism(), costs);
+                               
estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), 
costs);
                        }
                        candidate.setCosts(costs);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
index 5370678..482951b 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
-
+/**
+ * Methods for operators / connections that provide estimated about data size 
and
+ * characteristics.
+ */
 public interface EstimateProvider {
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
index c8dda12..118ddc8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
@@ -41,8 +41,8 @@ public class FilterNode extends SingleInputNode {
        }
 
        @Override
-       public FilterOperatorBase<?, ?> getPactContract() {
-               return (FilterOperatorBase<?, ?>) super.getPactContract();
+       public FilterOperatorBase<?, ?> getOperator() {
+               return (FilterOperatorBase<?, ?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
index bb5799c..f713d56 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
@@ -40,8 +40,8 @@ public class FlatMapNode extends SingleInputNode {
        }
 
        @Override
-       public FlatMapOperatorBase<?, ?, ?> getPactContract() {
-               return (FlatMapOperatorBase<?, ?, ?>) super.getPactContract();
+       public FlatMapOperatorBase<?, ?, ?> getOperator() {
+               return (FlatMapOperatorBase<?, ?, ?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index 2e2576d..564c0d3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -54,7 +54,7 @@ public class GroupCombineNode extends SingleInputNode {
        private List<OperatorDescriptorSingle> initPossibleProperties() {
 
                // check if we can work with a grouping (simple reducer), or if 
we need ordering because of a group order
-               Ordering groupOrder = getPactContract().getGroupOrder();
+               Ordering groupOrder = getOperator().getGroupOrder();
                if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
                        groupOrder = null;
                }
@@ -74,8 +74,8 @@ public class GroupCombineNode extends SingleInputNode {
         * @return The operator represented by this optimizer node.
         */
        @Override
-       public GroupCombineOperatorBase<?, ?, ?> getPactContract() {
-               return (GroupCombineOperatorBase<?, ?, ?>) 
super.getPactContract();
+       public GroupCombineOperatorBase<?, ?, ?> getOperator() {
+               return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 88ec53a..77acae5 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.AllGroupReduceProperties;
 import 
org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties;
 import org.apache.flink.optimizer.operators.GroupReduceProperties;
@@ -67,17 +67,17 @@ public class GroupReduceNode extends SingleInputNode {
        
        private List<OperatorDescriptorSingle> 
initPossibleProperties(Partitioner<?> customPartitioner) {
                // see if an internal hint dictates the strategy to use
-               final Configuration conf = getPactContract().getParameters();
-               final String localStrategy = 
conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+               final Configuration conf = getOperator().getParameters();
+               final String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 
                final boolean useCombiner;
                if (localStrategy != null) {
-                       if 
(PactCompiler.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
                                useCombiner = false;
                        }
-                       else if 
(PactCompiler.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
+                       else if 
(Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
                                if (!isCombineable()) {
-                                       PactCompiler.LOG.warn("Strategy hint 
for GroupReduce '" + getPactContract().getName() + 
+                                       Optimizer.LOG.warn("Strategy hint for 
GroupReduce '" + getOperator().getName() +
                                                "' requires combinable reduce, 
but user function is not marked combinable.");
                                }
                                useCombiner = true;
@@ -90,8 +90,8 @@ public class GroupReduceNode extends SingleInputNode {
                
                // check if we can work with a grouping (simple reducer), or if 
we need ordering because of a group order
                Ordering groupOrder = null;
-               if (getPactContract() instanceof GroupReduceOperatorBase) {
-                       groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) 
getPactContract()).getGroupOrder();
+               if (getOperator() instanceof GroupReduceOperatorBase) {
+                       groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) 
getOperator()).getGroupOrder();
                        if (groupOrder != null && 
groupOrder.getNumberOfFields() == 0) {
                                groupOrder = null;
                        }
@@ -112,8 +112,8 @@ public class GroupReduceNode extends SingleInputNode {
         * @return The operator represented by this optimizer node.
         */
        @Override
-       public GroupReduceOperatorBase<?, ?, ?> getPactContract() {
-               return (GroupReduceOperatorBase<?, ?, ?>) 
super.getPactContract();
+       public GroupReduceOperatorBase<?, ?, ?> getOperator() {
+               return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator();
        }
 
        /**
@@ -123,7 +123,7 @@ public class GroupReduceNode extends SingleInputNode {
         * @return True, if a combiner has been given, false otherwise.
         */
        public boolean isCombineable() {
-               return getPactContract().isCombinable();
+               return getOperator().isCombinable();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
index 3c67108..cbd58ca 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
 import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
 import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
@@ -62,8 +62,8 @@ public class JoinNode extends TwoInputNode {
         * @return The contract.
         */
        @Override
-       public JoinOperatorBase<?, ?, ?, ?> getPactContract() {
-               return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract();
+       public JoinOperatorBase<?, ?, ?, ?> getOperator() {
+               return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
        }
 
        @Override
@@ -116,21 +116,21 @@ public class JoinNode extends TwoInputNode {
        {
                // see if an internal hint dictates the strategy to use
                Configuration conf = joinOperatorBase.getParameters();
-               String localStrategy = 
conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+               String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 
                if (localStrategy != null) {
                        final AbstractJoinDescriptor fixedDriverStrat;
-                       if 
(PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-                               
PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-                               
PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-                               
PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
                        {
                                fixedDriverStrat = new 
SortMergeJoinDescriptor(this.keys1, this.keys2);
                        }
-                       else if 
(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+                       else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
                                fixedDriverStrat = new 
HashJoinBuildFirstProperties(this.keys1, this.keys2);
                        }
-                       else if 
(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+                       else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
                                fixedDriverStrat = new 
HashJoinBuildSecondProperties(this.keys1, this.keys2);
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
index cbcab2b..de3cd22 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
 import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
@@ -58,8 +58,8 @@ public class MatchNode extends TwoInputNode {
         * @return The contract.
         */
        @Override
-       public JoinOperatorBase<?, ?, ?, ?> getPactContract() {
-               return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract();
+       public JoinOperatorBase<?, ?, ?, ?> getOperator() {
+               return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
        }
 
        @Override
@@ -110,19 +110,19 @@ public class MatchNode extends TwoInputNode {
        private List<OperatorDescriptorDual> 
getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint 
joinHint) {
                // see if an internal hint dictates the strategy to use
                Configuration conf = joinOperatorBase.getParameters();
-               String localStrategy = 
conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+               String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 
                if (localStrategy != null) {
                        final OperatorDescriptorDual fixedDriverStrat;
-                       if 
(PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-                               
PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-                               
PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-                               
PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
                        {
                                fixedDriverStrat = new 
SortMergeJoinDescriptor(this.keys1, this.keys2);
-                       } else if 
(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
                                fixedDriverStrat = new 
HashJoinBuildFirstProperties(this.keys1, this.keys2);
-                       } else if 
(PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
                                fixedDriverStrat = new 
HashJoinBuildSecondProperties(this.keys1, this.keys2);
                        } else {
                                throw new CompilerException("Invalid local 
strategy hint for match contract: " + localStrategy);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index baeb1f7..0cad34e 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
 import java.util.ArrayList;
@@ -48,8 +47,19 @@ import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
 /**
- * This class represents a node in the optimizer's internal representation of 
the PACT plan. It contains
- * extra information about estimates, hints and data properties.
+ * The OptimizerNode is the base class of all nodes in the optimizer DAG. The 
optimizer DAG is the
+ * optimizer's representation of a program, created before the actual 
optimization (which creates different
+ * candidate plans and computes their cost).
+ * <p>>
+ * Nodes in the DAG correspond (almost) one-to-one to the operators in a 
program. The optimizer DAG is constructed
+ * to hold the additional information that the optimizer needs:
+ * <ul>
+ *     <li>Estimates of the data size processed by each operator</li>
+ *     <li>Helper structures to track where the data flow "splits" and 
"joins", to support flows that are
+ *         DAGs but not trees.</li>
+ *     <li>Tags and weights to differentiate between loop-variant and 
-invariant parts of an iteration</li>
+ *     <li>Interesting properties to be used during the enumeration of 
candidate plans</li>
+ * </ul>
  */
 public abstract class OptimizerNode implements Visitable<OptimizerNode>, 
EstimateProvider, DumpableNode<OptimizerNode> {
        
@@ -59,13 +69,13 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        //                                      Members
        // 
--------------------------------------------------------------------------------------------
 
-       private final Operator<?> pactContract; // The operator (Reduce / Join 
/ DataSource / ...)
+       private final Operator<?> operator; // The operator (Reduce / Join / 
DataSource / ...)
        
        private List<String> broadcastConnectionNames = new 
ArrayList<String>(); // the broadcast inputs names of this node
        
-       private List<PactConnection> broadcastConnections = new 
ArrayList<PactConnection>(); // the broadcast inputs of this node
+       private List<DagConnection> broadcastConnections = new 
ArrayList<DagConnection>(); // the broadcast inputs of this node
        
-       private List<PactConnection> outgoingConnections; // The links to 
succeeding nodes
+       private List<DagConnection> outgoingConnections; // The links to 
succeeding nodes
 
        private InterestingProperties intProps; // the interesting properties 
of this node
        
@@ -88,7 +98,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
 
        // --------------------------------- General Parameters 
---------------------------------------
        
-       private int degreeOfParallelism = -1; // the number of parallel 
instances of this node
+       private int parallelism = -1; // the number of parallel instances of 
this node
        
        private long minimalMemoryPerSubTask = -1;
 
@@ -105,18 +115,17 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        // 
------------------------------------------------------------------------
 
        /**
-        * Creates a new node for the optimizer plan.
+        * Creates a new optimizer node that represents the given program 
operator.
         * 
         * @param op The operator that the node represents.
         */
        public OptimizerNode(Operator<?> op) {
-               this.pactContract = op;
+               this.operator = op;
                readStubAnnotations();
        }
        
        protected OptimizerNode(OptimizerNode toCopy) {
-               this.pactContract = toCopy.pactContract;
-               
+               this.operator = toCopy.operator;
                this.intProps = toCopy.intProps;
                
                this.openBranches = toCopy.openBranches;
@@ -125,7 +134,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                this.estimatedOutputSize = toCopy.estimatedOutputSize;
                this.estimatedNumRecords = toCopy.estimatedNumRecords;
                
-               this.degreeOfParallelism = toCopy.degreeOfParallelism;
+               this.parallelism = toCopy.parallelism;
                this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask;
                
                this.id = toCopy.id;
@@ -156,7 +165,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                                                                        
ExecutionMode defaultExchangeMode);
 
        /**
-        * This function connects the predecessors to this operator.
+        * This function connects the operators that produce the broadcast 
inputs to this operator.
         *
         * @param operatorToNode The map from program operators to optimizer 
nodes.
         * @param defaultExchangeMode The data exchange mode to use, if the 
operator does not
@@ -164,21 +173,19 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         *
         * @throws CompilerException
         */
-       public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> 
operatorToNode, ExecutionMode defaultExchangeMode)
-                       throws CompilerException
-       {
+       public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> 
operatorToNode, ExecutionMode defaultExchangeMode) {
                // skip for Operators that don't support broadcast variables 
-               if (!(getPactContract() instanceof AbstractUdfOperator<?, ?>)) {
+               if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) {
                        return;
                }
 
                // get all broadcast inputs
-               AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, 
?>) getPactContract());
+               AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, 
?>) getOperator());
 
                // create connections and add them
                for (Map.Entry<String, Operator<?>> input : 
operator.getBroadcastInputs().entrySet()) {
                        OptimizerNode predecessor = 
operatorToNode.get(input.getValue());
-                       PactConnection connection = new 
PactConnection(predecessor, this,
+                       DagConnection connection = new 
DagConnection(predecessor, this,
                                                                                
                                        ShipStrategyType.BROADCAST, 
defaultExchangeMode);
                        addBroadcastConnection(input.getKey(), connection);
                        predecessor.addOutgoingConnection(connection);
@@ -186,11 +193,12 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        }
 
        /**
+        * Gets all incoming connections of this node.
         * This method needs to be overridden by subclasses to return the 
children.
         * 
         * @return The list of incoming connections.
         */
-       public abstract List<PactConnection> getIncomingConnections();
+       public abstract List<DagConnection> getIncomingConnections();
 
        /**
         * Tells the node to compute the interesting properties for its inputs. 
The interesting properties
@@ -215,7 +223,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                                                                                
                                        List<UnclosedBranchDescriptor> 
branchesSoFar)
        {
                // handle the data flow branching for the broadcast inputs
-               for (PactConnection broadcastInput : getBroadcastConnections()) 
{
+               for (DagConnection broadcastInput : getBroadcastConnections()) {
                        OptimizerNode bcSource = broadcastInput.getSource();
                        addClosedBranches(bcSource.closedBranchingNodes);
                        
@@ -263,11 +271,11 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        public Iterable<OptimizerNode> getPredecessors() {
                List<OptimizerNode> allPredecessors = new 
ArrayList<OptimizerNode>();
 
-               for (PactConnection pactConnection : getIncomingConnections()) {
-                       allPredecessors.add(pactConnection.getSource());
+               for (DagConnection dagConnection : getIncomingConnections()) {
+                       allPredecessors.add(dagConnection.getSource());
                }
                
-               for (PactConnection conn : getBroadcastConnections()) {
+               for (DagConnection conn : getBroadcastConnections()) {
                        allPredecessors.add(conn.getSource());
                }
                
@@ -306,7 +314,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * 
         * @param broadcastConnection The connection to add.
         */
-       public void addBroadcastConnection(String name, PactConnection 
broadcastConnection) {
+       public void addBroadcastConnection(String name, DagConnection 
broadcastConnection) {
                this.broadcastConnectionNames.add(name);
                this.broadcastConnections.add(broadcastConnection);
        }
@@ -321,26 +329,26 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        /**
         * Return the list of inputs associated with broadcast variables for 
this node.
         */
-       public List<PactConnection> getBroadcastConnections() {
+       public List<DagConnection> getBroadcastConnections() {
                return this.broadcastConnections;
        }
 
        /**
         * Adds a new outgoing connection to this node.
         * 
-        * @param pactConnection
+        * @param connection
         *        The connection to add.
         */
-       public void addOutgoingConnection(PactConnection pactConnection) {
+       public void addOutgoingConnection(DagConnection connection) {
                if (this.outgoingConnections == null) {
-                       this.outgoingConnections = new 
ArrayList<PactConnection>();
+                       this.outgoingConnections = new 
ArrayList<DagConnection>();
                } else {
                        if (this.outgoingConnections.size() == 64) {
                                throw new CompilerException("Cannot currently 
handle nodes with more than 64 outputs.");
                        }
                }
 
-               this.outgoingConnections.add(pactConnection);
+               this.outgoingConnections.add(connection);
        }
 
        /**
@@ -348,7 +356,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * 
         * @return The list of outgoing connections.
         */
-       public List<PactConnection> getOutgoingConnections() {
+       public List<DagConnection> getOutgoingConnections() {
                return this.outgoingConnections;
        }
 
@@ -357,8 +365,8 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * 
         * @return This node's operator.
         */
-       public Operator<?> getPactContract() {
-               return this.pactContract;
+       public Operator<?> getOperator() {
+               return this.operator;
        }
 
        /**
@@ -369,8 +377,8 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * 
         * @return The parallelism of the operator.
         */
-       public int getDegreeOfParallelism() {
-               return this.degreeOfParallelism;
+       public int getParallelism() {
+               return this.parallelism;
        }
 
        /**
@@ -386,7 +394,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                if (parallelism < 1 && parallelism != -1) {
                        throw new IllegalArgumentException("Degree of 
parallelism of " + parallelism + " is invalid.");
                }
-               this.degreeOfParallelism = parallelism;
+               this.parallelism = parallelism;
        }
        
        /**
@@ -395,7 +403,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * @return The total amount of memory across all subtasks.
         */
        public long getMinimalMemoryAcrossAllSubTasks() {
-               return this.minimalMemoryPerSubTask == -1 ? -1 : 
this.minimalMemoryPerSubTask * this.degreeOfParallelism;
+               return this.minimalMemoryPerSubTask == -1 ? -1 : 
this.minimalMemoryPerSubTask * this.parallelism;
        }
        
        public boolean isOnDynamicPath() {
@@ -406,13 +414,13 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                boolean anyDynamic = false;
                boolean allDynamic = true;
                
-               for (PactConnection conn : getIncomingConnections()) {
+               for (DagConnection conn : getIncomingConnections()) {
                        boolean dynamicIn = conn.isOnDynamicPath();
                        anyDynamic |= dynamicIn;
                        allDynamic &= dynamicIn;
                }
                
-               for (PactConnection conn : getBroadcastConnections()) {
+               for (DagConnection conn : getBroadcastConnections()) {
                        boolean dynamicIn = conn.isOnDynamicPath();
                        anyDynamic |= dynamicIn;
                        allDynamic &= dynamicIn;
@@ -424,7 +432,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                        if (!allDynamic) {
                                // this node joins static and dynamic path.
                                // mark the connections where the source is not 
dynamic as cached
-                               for (PactConnection conn : 
getIncomingConnections()) {
+                               for (DagConnection conn : 
getIncomingConnections()) {
                                        if 
(!conn.getSource().isOnDynamicPath()) {
                                                
conn.setMaterializationMode(conn.getMaterializationMode().makeCached());
                                        }
@@ -442,10 +450,10 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        
        public int getMaxDepth() {
                int maxDepth = 0;
-               for (PactConnection conn : getIncomingConnections()) {
+               for (DagConnection conn : getIncomingConnections()) {
                        maxDepth = Math.max(maxDepth, conn.getMaxDepth());
                }
-               for (PactConnection conn : getBroadcastConnections()) {
+               for (DagConnection conn : getBroadcastConnections()) {
                        maxDepth = Math.max(maxDepth, conn.getMaxDepth());
                }
                
@@ -502,7 +510,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                if (this.outgoingConnections == null) {
                        throw new IllegalStateException("The outgoing 
connections have not yet been initialized.");
                }
-               for (PactConnection conn : getOutgoingConnections()) {
+               for (DagConnection conn : getOutgoingConnections()) {
                        conn.markBreaksPipeline();
                }
        }
@@ -517,7 +525,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * @return True, if on all outgoing connections, the interesting 
properties are set. False otherwise.
         */
        public boolean haveAllOutputConnectionInterestingProperties() {
-               for (PactConnection conn : getOutgoingConnections()) {
+               for (DagConnection conn : getOutgoingConnections()) {
                        if (conn.getInterestingProperties() == null) {
                                return false;
                        }
@@ -535,7 +543,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * leaves the original objects, contained by the connections, unchanged.
         */
        public void computeUnionOfInterestingPropertiesFromSuccessors() {
-               List<PactConnection> conns = getOutgoingConnections();
+               List<DagConnection> conns = getOutgoingConnections();
                if (conns.size() == 0) {
                        // no incoming, we have none ourselves
                        this.intProps = new InterestingProperties();
@@ -550,10 +558,10 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        
        public void clearInterestingProperties() {
                this.intProps = null;
-               for (PactConnection conn : getIncomingConnections()) {
+               for (DagConnection conn : getIncomingConnections()) {
                        conn.clearInterestingProperties();
                }
-               for (PactConnection conn : getBroadcastConnections()) {
+               for (DagConnection conn : getBroadcastConnections()) {
                        conn.clearInterestingProperties();
                }
        }
@@ -570,7 +578,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         */
        public void computeOutputEstimates(DataStatistics statistics) {
                // sanity checking
-               for (PactConnection c : getIncomingConnections()) {
+               for (DagConnection c : getIncomingConnections()) {
                        if (c.getSource() == null) {
                                throw new CompilerException("Bug: Estimate 
computation called before inputs have been set.");
                        }
@@ -587,11 +595,11 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                }
                
                // overwrite default estimates with hints, if given
-               if (getPactContract() == null || 
getPactContract().getCompilerHints() == null) {
+               if (getOperator() == null || getOperator().getCompilerHints() 
== null) {
                        return ;
                }
                
-               CompilerHints hints = getPactContract().getCompilerHints();
+               CompilerHints hints = getOperator().getCompilerHints();
                if (hints.getOutputSize() >= 0) {
                        this.estimatedOutputSize = hints.getOutputSize();
                }
@@ -643,8 +651,8 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        }
        
        protected void readUniqueFieldsAnnotation() {
-               if (this.pactContract.getCompilerHints() != null) {
-                       Set<FieldSet> uniqueFieldSets = 
pactContract.getCompilerHints().getUniqueFields();
+               if (this.operator.getCompilerHints() != null) {
+                       Set<FieldSet> uniqueFieldSets = 
operator.getCompilerHints().getUniqueFields();
                        if (uniqueFieldSets != null) {
                                if (this.uniqueFields == null) {
                                        this.uniqueFields = new 
HashSet<FieldSet>();
@@ -686,7 +694,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                
                // for pruning, we are quasi AFTER the node, so in the presence 
of
                // branches, we need form the per-branch-choice groups by the 
choice
-               // they made at the latest unjoined branching node. Note that 
this is
+               // they made at the latest un-joined branching node. Note that 
this is
                // different from the check for branch compatibility of 
candidates, as
                // this happens on the input sub-plans and hence BEFORE the 
node (therefore
                // it is relevant to find the latest (partially) joined branch 
point.
@@ -708,12 +716,12 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                                
                                @Override
                                public int compare(PlanNode o1, PlanNode o2) {
-                                       for (int i = 0; i < 
branchDeterminers.length; i++) {
-                                               PlanNode n1 = 
o1.getCandidateAtBranchPoint(branchDeterminers[i]);
-                                               PlanNode n2 = 
o2.getCandidateAtBranchPoint(branchDeterminers[i]);
+                                       for (OptimizerNode branchDeterminer : 
branchDeterminers) {
+                                               PlanNode n1 = 
o1.getCandidateAtBranchPoint(branchDeterminer);
+                                               PlanNode n2 = 
o2.getCandidateAtBranchPoint(branchDeterminer);
                                                int hash1 = 
System.identityHashCode(n1);
                                                int hash2 = 
System.identityHashCode(n2);
-                                               
+
                                                if (hash1 != hash2) {
                                                        return hash1 - hash2;
                                                }
@@ -775,8 +783,10 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        
        protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> 
plans) {
                // for each interesting property, which plans are cheapest
-               final RequestedGlobalProperties[] gps = 
(RequestedGlobalProperties[]) this.intProps.getGlobalProperties().toArray(new 
RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
-               final RequestedLocalProperties[] lps = 
(RequestedLocalProperties[]) this.intProps.getLocalProperties().toArray(new 
RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
+               final RequestedGlobalProperties[] gps = 
this.intProps.getGlobalProperties().toArray(
+                                                       new 
RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
+               final RequestedLocalProperties[] lps = 
this.intProps.getLocalProperties().toArray(
+                                                       new 
RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
                
                final PlanNode[][] toKeep = new PlanNode[gps.length][];
                final PlanNode[] cheapestForGlobal = new PlanNode[gps.length];
@@ -831,14 +841,12 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                        plans.add(cheapest);
                        cheapest.setPruningMarker(); // remember that that plan 
is in the set
                }
-               
-               // skip the top down delta cost check for now (TODO: implement 
this)
+
                // add all others, which are optimal for some interesting 
properties
                for (int i = 0; i < gps.length; i++) {
                        if (toKeep[i] != null) {
                                final PlanNode[] localMatches = toKeep[i];
-                               for (int k = 0; k < localMatches.length; k++) {
-                                       final PlanNode n = localMatches[k];
+                               for (final PlanNode n : localMatches) {
                                        if (n != null && !n.isPruneMarkerSet()) 
{
                                                n.setPruningMarker();
                                                plans.add(n);
@@ -873,7 +881,7 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
        }
 
 
-       protected List<UnclosedBranchDescriptor> 
getBranchesForParent(PactConnection toParent) {
+       protected List<UnclosedBranchDescriptor> 
getBranchesForParent(DagConnection toParent) {
                if (this.outgoingConnections.size() == 1) {
                        // return our own stack of open branches, because 
nothing is added
                        if (this.openBranches == null || 
this.openBranches.isEmpty()) {
@@ -954,8 +962,8 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
         * a) There is no branch in the sub-plan of this node
         * b) Both candidates have the same candidate as the child at the last 
open branch. 
         * 
-        * @param plan1
-        * @param plan2
+        * @param plan1 The root node of the first candidate plan.
+        * @param plan2 The root node of the second candidate plan.
         * @return True if the nodes are branch compatible in the inputs.
         */
        protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) {
@@ -1086,39 +1094,6 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                Collections.reverse(result);
                return didCloseABranch;
        }
-       
-       /**
-        *
-        */
-       public static final class UnclosedBranchDescriptor
-       {
-               protected OptimizerNode branchingNode;
-
-               protected long joinedPathsVector;
-
-               /**
-                * @param branchingNode
-                * @param joinedPathsVector
-                */
-               protected UnclosedBranchDescriptor(OptimizerNode branchingNode, 
long joinedPathsVector)
-               {
-                       this.branchingNode = branchingNode;
-                       this.joinedPathsVector = joinedPathsVector;
-               }
-
-               public OptimizerNode getBranchingNode() {
-                       return this.branchingNode;
-               }
-
-               public long getJoinedPathsVector() {
-                       return this.joinedPathsVector;
-               }
-               
-               @Override
-               public String toString() {
-                       return "(" + this.branchingNode.getPactContract() + ") 
[" + this.joinedPathsVector + "]";
-               }
-       }
 
        @Override
        public OptimizerNode getOptimizerNode() {
@@ -1145,14 +1120,53 @@ public abstract class OptimizerNode implements 
Visitable<OptimizerNode>, Estimat
                StringBuilder bld = new StringBuilder();
 
                bld.append(getName());
-               bld.append(" (").append(getPactContract().getName()).append(") 
");
+               bld.append(" (").append(getOperator().getName()).append(") ");
 
                int i = 1; 
-               for (PactConnection conn : getIncomingConnections()) {
+               for (DagConnection conn : getIncomingConnections()) {
                        String shipStrategyName = conn.getShipStrategy() == 
null ? "null" : conn.getShipStrategy().name();
                        
bld.append('(').append(i++).append(":").append(shipStrategyName).append(')');
                }
 
                return bld.toString();
        }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Description of an unclosed branch. An unclosed branch is when the 
data flow branched (one operator's
+        * result is consumed by multiple targets), but these different 
branches (targets) have not been joined
+        * together.
+        */
+       public static final class UnclosedBranchDescriptor {
+
+               protected OptimizerNode branchingNode;
+
+               protected long joinedPathsVector;
+
+               /**
+                * Creates a new branching descriptor.
+                *
+                * @param branchingNode The node where the branch occurred (teh 
node with multiple outputs).
+                * @param joinedPathsVector A bit vector describing which 
branches are tracked by this descriptor.
+                *                          The bit vector is one, where the 
branch is tracked, zero otherwise.
+                */
+               protected UnclosedBranchDescriptor(OptimizerNode branchingNode, 
long joinedPathsVector) {
+                       this.branchingNode = branchingNode;
+                       this.joinedPathsVector = joinedPathsVector;
+               }
+
+               public OptimizerNode getBranchingNode() {
+                       return this.branchingNode;
+               }
+
+               public long getJoinedPathsVector() {
+                       return this.joinedPathsVector;
+               }
+
+               @Override
+               public String toString() {
+                       return "(" + this.branchingNode.getOperator() + ") [" + 
this.joinedPathsVector + "]";
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
deleted file mode 100644
index 661ceb5..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-/**
- * A connection between to operators. Represents an intermediate result
- * and a data exchange between the two operators.
- *
- * The data exchange has a mode in which it performs (batch / pipelined)
- *
- * The data exchange strategy may be set on this connection, in which case
- * it is fixed and will not be determined during candidate plan enumeration.
- *
- * During the enumeration of interesting properties, this connection also holds
- * all interesting properties generated by the successor operator.
- */
-public class PactConnection implements EstimateProvider, 
DumpableConnection<OptimizerNode> {
-       
-       private final OptimizerNode source; // The source node of the connection
-
-       private final OptimizerNode target; // The target node of the 
connection.
-
-       private final ExecutionMode dataExchangeMode; // defines whether to use 
batch or pipelined data exchange
-
-       private InterestingProperties interestingProps; // local properties 
that succeeding nodes are interested in
-
-       private ShipStrategyType shipStrategy; // The data shipping strategy, 
if predefined.
-       
-       private TempMode materializationMode = TempMode.NONE; // the 
materialization mode
-       
-       private int maxDepth = -1;
-
-       private boolean breakPipeline;  // whether this connection should break 
the pipeline due to potential deadlocks
-
-       /**
-        * Creates a new Connection between two nodes. The shipping strategy is 
by default <tt>NONE</tt>.
-        * The temp mode is by default <tt>NONE</tt>.
-        * 
-        * @param source
-        *        The source node.
-        * @param target
-        *        The target node.
-        */
-       public PactConnection(OptimizerNode source, OptimizerNode target, 
ExecutionMode exchangeMode) {
-               this(source, target, null, exchangeMode);
-       }
-
-       /**
-        * Creates a new Connection between two nodes.
-        * 
-        * @param source
-        *        The source node.
-        * @param target
-        *        The target node.
-        * @param shipStrategy
-        *        The shipping strategy.
-        * @param exchangeMode
-        *        The data exchange mode (pipelined / batch / batch only for 
shuffles / ... )
-        */
-       public PactConnection(OptimizerNode source, OptimizerNode target,
-                                                       ShipStrategyType 
shipStrategy, ExecutionMode exchangeMode)
-       {
-               if (source == null || target == null) {
-                       throw new NullPointerException("Source and target must 
not be null.");
-               }
-               this.source = source;
-               this.target = target;
-               this.shipStrategy = shipStrategy;
-               this.dataExchangeMode = exchangeMode;
-       }
-       
-       /**
-        * Constructor to create a result from an operator that is not
-        * consumed by another operator.
-        * 
-        * @param source
-        *        The source node.
-        */
-       public PactConnection(OptimizerNode source, ExecutionMode exchangeMode) 
{
-               if (source == null) {
-                       throw new NullPointerException("Source and target must 
not be null.");
-               }
-               this.source = source;
-               this.target = null;
-               this.shipStrategy = ShipStrategyType.NONE;
-               this.dataExchangeMode = exchangeMode;
-       }
-
-       /**
-        * Gets the source of the connection.
-        * 
-        * @return The source Node.
-        */
-       public OptimizerNode getSource() {
-               return this.source;
-       }
-
-       /**
-        * Gets the target of the connection.
-        * 
-        * @return The target node.
-        */
-       public OptimizerNode getTarget() {
-               return this.target;
-       }
-
-       /**
-        * Gets the shipping strategy for this connection.
-        * 
-        * @return The connection's shipping strategy.
-        */
-       public ShipStrategyType getShipStrategy() {
-               return this.shipStrategy;
-       }
-
-       /**
-        * Sets the shipping strategy for this connection.
-        * 
-        * @param strategy
-        *        The shipping strategy to be applied to this connection.
-        */
-       public void setShipStrategy(ShipStrategyType strategy) {
-               this.shipStrategy = strategy;
-       }
-
-       /**
-        * Gets the data exchange mode to use for this connection.
-        *
-        * @return The data exchange mode to use for this connection.
-        */
-       public ExecutionMode getDataExchangeMode() {
-               if (dataExchangeMode == null) {
-                       throw new IllegalStateException("This connection does 
not have the data exchange mode set");
-               }
-               return dataExchangeMode;
-       }
-
-       /**
-        * Marks that this connection should do a decoupled data exchange (such 
as batched)
-        * rather then pipeline data. Connections are marked as pipeline 
breakers to avoid
-        * deadlock situations.
-        */
-       public void markBreaksPipeline() {
-               this.breakPipeline = true;
-       }
-
-       /**
-        * Checks whether this connection is marked to break the pipeline.
-        *
-        * @return True, if this connection is marked to break the pipeline, 
false otherwise.
-        */
-       public boolean isBreakingPipeline() {
-               return this.breakPipeline;
-       }
-
-       /**
-        * Gets the interesting properties object for this pact connection.
-        * If the interesting properties for this connections have not yet been 
set,
-        * this method returns null.
-        * 
-        * @return The collection of all interesting properties, or null, if 
they have not yet been set.
-        */
-       public InterestingProperties getInterestingProperties() {
-               return this.interestingProps;
-       }
-
-       /**
-        * Sets the interesting properties for this pact connection.
-        * 
-        * @param props The interesting properties.
-        */
-       public void setInterestingProperties(InterestingProperties props) {
-               if (this.interestingProps == null) {
-                       this.interestingProps = props;
-               } else {
-                       throw new IllegalStateException("Interesting Properties 
have already been set.");
-               }
-       }
-       
-       public void clearInterestingProperties() {
-               this.interestingProps = null;
-       }
-       
-       public void initMaxDepth() {
-               
-               if (this.maxDepth == -1) {
-                       this.maxDepth = this.source.getMaxDepth() + 1;
-               } else {
-                       throw new IllegalStateException("Maximum path depth has 
already been initialized.");
-               }
-       }
-       
-       public int getMaxDepth() {
-               if (this.maxDepth != -1) {
-                       return this.maxDepth;
-               } else {
-                       throw new IllegalStateException("Maximum path depth has 
not been initialized.");
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Estimates
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public long getEstimatedOutputSize() {
-               return this.source.getEstimatedOutputSize();
-       }
-
-       @Override
-       public long getEstimatedNumRecords() {
-               return this.source.getEstimatedNumRecords();
-       }
-       
-       @Override
-       public float getEstimatedAvgWidthPerOutputRecord() {
-               return this.source.getEstimatedAvgWidthPerOutputRecord();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       
-       public TempMode getMaterializationMode() {
-               return this.materializationMode;
-       }
-       
-       public void setMaterializationMode(TempMode materializationMode) {
-               this.materializationMode = materializationMode;
-       }
-       
-       public boolean isOnDynamicPath() {
-               return this.source.isOnDynamicPath();
-       }
-       
-       public int getCostWeight() {
-               return this.source.getCostWeight();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public String toString() {
-               StringBuilder buf = new StringBuilder(50);
-               buf.append("Connection: ");
-
-               if (this.source == null) {
-                       buf.append("null");
-               } else {
-                       buf.append(this.source.getPactContract().getName());
-                       
buf.append('(').append(this.source.getName()).append(')');
-               }
-
-               buf.append(" -> ");
-
-               if (this.shipStrategy != null) {
-                       buf.append('[');
-                       buf.append(this.shipStrategy.name());
-                       buf.append(']').append(' ');
-               }
-
-               if (this.target == null) {
-                       buf.append("null");
-               } else {
-                       buf.append(this.target.getPactContract().getName());
-                       
buf.append('(').append(this.target.getName()).append(')');
-               }
-
-               return buf.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
index 00b54ac..5c811b0 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
@@ -49,13 +49,13 @@ public class PartitionNode extends SingleInputNode {
                super(operator);
                
                OperatorDescriptorSingle descr = new PartitionDescriptor(
-                                       
this.getPactContract().getPartitionMethod(), this.keys, 
operator.getCustomPartitioner());
+                                       
this.getOperator().getPartitionMethod(), this.keys, 
operator.getCustomPartitioner());
                this.possibleProperties = Collections.singletonList(descr);
        }
 
        @Override
-       public PartitionOperatorBase<?> getPactContract() {
-               return (PartitionOperatorBase<?>) super.getPactContract();
+       public PartitionOperatorBase<?> getOperator() {
+               return (PartitionOperatorBase<?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index 6f4c43d..1477038 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -62,8 +62,8 @@ public class ReduceNode extends SingleInputNode {
        // 
------------------------------------------------------------------------
 
        @Override
-       public ReduceOperatorBase<?, ?> getPactContract() {
-               return (ReduceOperatorBase<?, ?>) super.getPactContract();
+       public ReduceOperatorBase<?, ?> getOperator() {
+               return (ReduceOperatorBase<?, ?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
index 9217beb..cc12bb8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -36,7 +36,7 @@ import 
org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.InterestingProperties;
@@ -68,7 +68,7 @@ public abstract class SingleInputNode extends OptimizerNode {
        
        protected final FieldSet keys;                  // The set of key fields
        
-       protected PactConnection inConn;                // the input of the node
+       protected DagConnection inConn;                 // the input of the node
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -103,8 +103,8 @@ public abstract class SingleInputNode extends OptimizerNode 
{
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       public SingleInputOperator<?, ?, ?> getPactContract() {
-               return (SingleInputOperator<?, ?, ?>) super.getPactContract();
+       public SingleInputOperator<?, ?, ?> getOperator() {
+               return (SingleInputOperator<?, ?, ?>) super.getOperator();
        }
        
        /**
@@ -112,7 +112,7 @@ public abstract class SingleInputNode extends OptimizerNode 
{
         * 
         * @return The input.
         */
-       public PactConnection getIncomingConnection() {
+       public DagConnection getIncomingConnection() {
                return this.inConn;
        }
 
@@ -121,7 +121,7 @@ public abstract class SingleInputNode extends OptimizerNode 
{
         * 
         * @param inConn The input connection to set.
         */
-       public void setIncomingConnection(PactConnection inConn) {
+       public void setIncomingConnection(DagConnection inConn) {
                this.inConn = inConn;
        }
        
@@ -139,14 +139,14 @@ public abstract class SingleInputNode extends 
OptimizerNode {
        }
 
        @Override
-       public List<PactConnection> getIncomingConnections() {
+       public List<DagConnection> getIncomingConnections() {
                return Collections.singletonList(this.inConn);
        }
        
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return getPactContract().getSemanticProperties();
+               return getOperator().getSemanticProperties();
        }
        
 
@@ -155,18 +155,18 @@ public abstract class SingleInputNode extends 
OptimizerNode {
                        throws CompilerException
        {
                // see if an internal hint dictates the strategy to use
-               final Configuration conf = getPactContract().getParameters();
-               final String shipStrategy = 
conf.getString(PactCompiler.HINT_SHIP_STRATEGY, null);
+               final Configuration conf = getOperator().getParameters();
+               final String shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
                final ShipStrategyType preSet;
                
                if (shipStrategy != null) {
-                       if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH))
 {
+                       if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
                                preSet = ShipStrategyType.PARTITION_HASH;
-                       } else if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE))
 {
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) 
{
                                preSet = ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_FORWARD)) {
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
                                preSet = ShipStrategyType.FORWARD;
-                       } else if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
                                preSet = ShipStrategyType.PARTITION_RANDOM;
                        } else {
                                throw new CompilerException("Unrecognized ship 
strategy hint: " + shipStrategy);
@@ -176,15 +176,15 @@ public abstract class SingleInputNode extends 
OptimizerNode {
                }
                
                // get the predecessor node
-               Operator<?> children = ((SingleInputOperator<?, ?, ?>) 
getPactContract()).getInput();
+               Operator<?> children = ((SingleInputOperator<?, ?, ?>) 
getOperator()).getInput();
                
                OptimizerNode pred;
-               PactConnection conn;
+               DagConnection conn;
                if (children == null) {
-                       throw new CompilerException("Error: Node for '" + 
getPactContract().getName() + "' has no input.");
+                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input.");
                } else {
                        pred = contractToNode.get(children);
-                       conn = new PactConnection(pred, this, 
defaultExchangeMode);
+                       conn = new DagConnection(pred, this, 
defaultExchangeMode);
                        if (preSet != null) {
                                conn.setShipStrategy(preSet);
                        }
@@ -230,7 +230,7 @@ public abstract class SingleInputNode extends OptimizerNode 
{
                }
                this.inConn.setInterestingProperties(props);
                
-               for (PactConnection conn : getBroadcastConnections()) {
+               for (DagConnection conn : getBroadcastConnections()) {
                        conn.setInterestingProperties(new 
InterestingProperties());
                }
        }
@@ -251,11 +251,11 @@ public abstract class SingleInputNode extends 
OptimizerNode {
                
                // calculate alternative sub-plans for broadcast inputs
                final List<Set<? extends NamedChannel>> broadcastPlanChannels = 
new ArrayList<Set<? extends NamedChannel>>();
-               List<PactConnection> broadcastConnections = 
getBroadcastConnections();
+               List<DagConnection> broadcastConnections = 
getBroadcastConnections();
                List<String> broadcastConnectionNames = 
getBroadcastConnectionNames();
 
                for (int i = 0; i < broadcastConnections.size(); i++ ) {
-                       PactConnection broadcastConnection = 
broadcastConnections.get(i);
+                       DagConnection broadcastConnection = 
broadcastConnections.get(i);
                        String broadcastConnectionName = 
broadcastConnectionNames.get(i);
                        List<PlanNode> broadcastPlanCandidates = 
broadcastConnection.getSource().getAlternativePlans(estimator);
 
@@ -283,8 +283,8 @@ public abstract class SingleInputNode extends OptimizerNode 
{
 
                final ExecutionMode executionMode = 
this.inConn.getDataExchangeMode();
 
-               final int dop = getDegreeOfParallelism();
-               final int inDop = getPredecessorNode().getDegreeOfParallelism();
+               final int dop = getParallelism();
+               final int inDop = getPredecessorNode().getParallelism();
                final boolean dopChange = inDop != dop;
 
                final boolean breaksPipeline = this.inConn.isBreakingPipeline();
@@ -509,7 +509,7 @@ public abstract class SingleInputNode extends OptimizerNode 
{
                        } else {
                                throw new CompilerException();
                        }
-                       for (PactConnection connection : 
getBroadcastConnections()) {
+                       for (DagConnection connection : 
getBroadcastConnections()) {
                                connection.getSource().accept(visitor);
                        }
                        visitor.postVisit(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
index 2d65b4d..40725ba 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -31,8 +31,9 @@ import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
 import org.apache.flink.types.Nothing;
 
 /**
- * This class represents a utility node that is not part of the actual plan. 
It is used for plans with multiple data sinks to
- * transform it into a plan with a single root node. That way, the code that 
makes sure no costs are double-counted and that 
+ * This class represents a utility node that is not part of the actual plan.
+ * It is used for plans with multiple data sinks to transform it into a plan 
with
+ * a single root node. That way, the code that makes sure no costs are 
double-counted and that
  * candidate selection works correctly with nodes that have multiple outputs 
is transparently reused.
  */
 public class SinkJoiner extends TwoInputNode {
@@ -40,8 +41,8 @@ public class SinkJoiner extends TwoInputNode {
        public SinkJoiner(OptimizerNode input1, OptimizerNode input2) {
                super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
 
-               PactConnection conn1 = new PactConnection(input1, this, null, 
ExecutionMode.PIPELINED);
-               PactConnection conn2 = new PactConnection(input2, this, null, 
ExecutionMode.PIPELINED);
+               DagConnection conn1 = new DagConnection(input1, this, null, 
ExecutionMode.PIPELINED);
+               DagConnection conn2 = new DagConnection(input2, this, null, 
ExecutionMode.PIPELINED);
                
                this.input1 = conn1;
                this.input2 = conn2;
@@ -55,7 +56,7 @@ public class SinkJoiner extends TwoInputNode {
        }
        
        @Override
-       public List<PactConnection> getOutgoingConnections() {
+       public List<DagConnection> getOutgoingConnections() {
                return Collections.emptyList();
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
index df47e56..1292cf5 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
@@ -46,7 +46,7 @@ public class SolutionSetNode extends 
AbstractPartialSolutionNode {
        // 
--------------------------------------------------------------------------------------------
        
        public void setCandidateProperties(GlobalProperties gProps, 
LocalProperties lProps, Channel initialInput) {
-               this.cachedPlans = Collections.<PlanNode>singletonList(new 
SolutionSetPlanNode(this, "SolutionSet("+this.getPactContract().getName()+")", 
gProps, lProps, initialInput));
+               this.cachedPlans = Collections.<PlanNode>singletonList(new 
SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", 
gProps, lProps, initialInput));
        }
        
        public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {
@@ -74,8 +74,8 @@ public class SolutionSetNode extends 
AbstractPartialSolutionNode {
         * @return The contract.
         */
        @Override
-       public SolutionSetPlaceHolder<?> getPactContract() {
-               return (SolutionSetPlaceHolder<?>) super.getPactContract();
+       public SolutionSetPlaceHolder<?> getOperator() {
+               return (SolutionSetPlaceHolder<?>) super.getOperator();
        }
 
        @Override
@@ -89,7 +89,7 @@ public class SolutionSetNode extends 
AbstractPartialSolutionNode {
                        return;
                }
 
-               PactConnection solutionSetInput = 
this.iterationNode.getFirstIncomingConnection();
+               DagConnection solutionSetInput = 
this.iterationNode.getFirstIncomingConnection();
                OptimizerNode solutionSetSource = solutionSetInput.getSource();
                
                addClosedBranches(solutionSetSource.closedBranchingNodes);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
index 17bc8f1..83bc39a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
@@ -51,8 +51,8 @@ public class SortPartitionNode extends SingleInputNode {
        }
 
        @Override
-       public SortPartitionOperatorBase<?> getPactContract() {
-               return (SortPartitionOperatorBase<?>) super.getPactContract();
+       public SortPartitionOperatorBase<?> getOperator() {
+               return (SortPartitionOperatorBase<?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
index d51f2de..39da165 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -36,7 +36,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.InterestingProperties;
@@ -70,9 +70,9 @@ public abstract class TwoInputNode extends OptimizerNode {
        
        protected final FieldList keys2; // The set of key fields for the 
second input
        
-       protected PactConnection input1; // The first input edge
+       protected DagConnection input1; // The first input edge
 
-       protected PactConnection input2; // The second input edge
+       protected DagConnection input2; // The second input edge
        
        private List<OperatorDescriptorDual> cachedDescriptors;
        
@@ -109,8 +109,8 @@ public abstract class TwoInputNode extends OptimizerNode {
        // 
------------------------------------------------------------------------
 
        @Override
-       public DualInputOperator<?, ?, ?, ?> getPactContract() {
-               return (DualInputOperator<?, ?, ?, ?>) super.getPactContract();
+       public DualInputOperator<?, ?, ?, ?> getOperator() {
+               return (DualInputOperator<?, ?, ?, ?>) super.getOperator();
        }
 
        /**
@@ -118,7 +118,7 @@ public abstract class TwoInputNode extends OptimizerNode {
         * 
         * @return The first input connection.
         */
-       public PactConnection getFirstIncomingConnection() {
+       public DagConnection getFirstIncomingConnection() {
                return this.input1;
        }
 
@@ -127,7 +127,7 @@ public abstract class TwoInputNode extends OptimizerNode {
         * 
         * @return The second input connection.
         */
-       public PactConnection getSecondIncomingConnection() {
+       public DagConnection getSecondIncomingConnection() {
                return this.input2;
        }
        
@@ -148,8 +148,8 @@ public abstract class TwoInputNode extends OptimizerNode {
        }
 
        @Override
-       public List<PactConnection> getIncomingConnections() {
-               ArrayList<PactConnection> inputs = new 
ArrayList<PactConnection>(2);
+       public List<DagConnection> getIncomingConnections() {
+               ArrayList<DagConnection> inputs = new 
ArrayList<DagConnection>(2);
                inputs.add(input1);
                inputs.add(input2);
                return inputs;
@@ -159,21 +159,21 @@ public abstract class TwoInputNode extends OptimizerNode {
        @Override
        public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultExecutionMode) {
                // see if there is a hint that dictates which shipping strategy 
to use for BOTH inputs
-               final Configuration conf = getPactContract().getParameters();
+               final Configuration conf = getOperator().getParameters();
                ShipStrategyType preSet1 = null;
                ShipStrategyType preSet2 = null;
                
-               String shipStrategy = 
conf.getString(PactCompiler.HINT_SHIP_STRATEGY, null);
+               String shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
                if (shipStrategy != null) {
-                       if 
(PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
                                preSet1 = preSet2 = ShipStrategyType.FORWARD;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
                                preSet1 = preSet2 = ShipStrategyType.BROADCAST;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
                                preSet1 = preSet2 = 
ShipStrategyType.PARTITION_HASH;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
                                preSet1 = preSet2 = 
ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
                                preSet1 = preSet2 = 
ShipStrategyType.PARTITION_RANDOM;
                        } else {
                                throw new CompilerException("Unknown hint for 
shipping strategy: " + shipStrategy);
@@ -181,17 +181,17 @@ public abstract class TwoInputNode extends OptimizerNode {
                }
 
                // see if there is a hint that dictates which shipping strategy 
to use for the FIRST input
-               shipStrategy = 
conf.getString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
+               shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
                if (shipStrategy != null) {
-                       if 
(PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
                                preSet1 = ShipStrategyType.FORWARD;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
                                preSet1 = ShipStrategyType.BROADCAST;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
                                preSet1 = ShipStrategyType.PARTITION_HASH;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
                                preSet1 = ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
                                preSet1 = ShipStrategyType.PARTITION_RANDOM;
                        } else {
                                throw new CompilerException("Unknown hint for 
shipping strategy of input one: " + shipStrategy);
@@ -199,17 +199,17 @@ public abstract class TwoInputNode extends OptimizerNode {
                }
 
                // see if there is a hint that dictates which shipping strategy 
to use for the SECOND input
-               shipStrategy = 
conf.getString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
+               shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
                if (shipStrategy != null) {
-                       if 
(PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
                                preSet2 = ShipStrategyType.FORWARD;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
                                preSet2 = ShipStrategyType.BROADCAST;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
                                preSet2 = ShipStrategyType.PARTITION_HASH;
-                       } else if 
(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
                                preSet2 = ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
                                preSet2 = ShipStrategyType.PARTITION_RANDOM;
                        } else {
                                throw new CompilerException("Unknown hint for 
shipping strategy of input two: " + shipStrategy);
@@ -217,18 +217,18 @@ public abstract class TwoInputNode extends OptimizerNode {
                }
                
                // get the predecessors
-               DualInputOperator<?, ?, ?, ?> contr = getPactContract();
+               DualInputOperator<?, ?, ?, ?> contr = getOperator();
                
                Operator<?> leftPred = contr.getFirstInput();
                Operator<?> rightPred = contr.getSecondInput();
                
                OptimizerNode pred1;
-               PactConnection conn1;
+               DagConnection conn1;
                if (leftPred == null) {
-                       throw new CompilerException("Error: Node for '" + 
getPactContract().getName() + "' has no input set for first input.");
+                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input set for first input.");
                } else {
                        pred1 = contractToNode.get(leftPred);
-                       conn1 = new PactConnection(pred1, this, 
defaultExecutionMode);
+                       conn1 = new DagConnection(pred1, this, 
defaultExecutionMode);
                        if (preSet1 != null) {
                                conn1.setShipStrategy(preSet1);
                        }
@@ -239,12 +239,12 @@ public abstract class TwoInputNode extends OptimizerNode {
                pred1.addOutgoingConnection(conn1);
                
                OptimizerNode pred2;
-               PactConnection conn2;
+               DagConnection conn2;
                if (rightPred == null) {
-                       throw new CompilerException("Error: Node for '" + 
getPactContract().getName() + "' has no input set for second input.");
+                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input set for second input.");
                } else {
                        pred2 = contractToNode.get(rightPred);
-                       conn2 = new PactConnection(pred2, this, 
defaultExecutionMode);
+                       conn2 = new DagConnection(pred2, this, 
defaultExecutionMode);
                        if (preSet2 != null) {
                                conn2.setShipStrategy(preSet2);
                        }
@@ -290,7 +290,7 @@ public abstract class TwoInputNode extends OptimizerNode {
                this.input1.setInterestingProperties(props1);
                this.input2.setInterestingProperties(props2);
                
-               for (PactConnection conn : getBroadcastConnections()) {
+               for (DagConnection conn : getBroadcastConnections()) {
                        conn.setInterestingProperties(new 
InterestingProperties());
                }
        }
@@ -314,11 +314,11 @@ public abstract class TwoInputNode extends OptimizerNode {
                
                // calculate alternative sub-plans for broadcast inputs
                final List<Set<? extends NamedChannel>> broadcastPlanChannels = 
new ArrayList<Set<? extends NamedChannel>>();
-               List<PactConnection> broadcastConnections = 
getBroadcastConnections();
+               List<DagConnection> broadcastConnections = 
getBroadcastConnections();
                List<String> broadcastConnectionNames = 
getBroadcastConnectionNames();
 
                for (int i = 0; i < broadcastConnections.size(); i++ ) {
-                       PactConnection broadcastConnection = 
broadcastConnections.get(i);
+                       DagConnection broadcastConnection = 
broadcastConnections.get(i);
                        String broadcastConnectionName = 
broadcastConnectionNames.get(i);
                        List<PlanNode> broadcastPlanCandidates = 
broadcastConnection.getSource().getAlternativePlans(estimator);
 
@@ -352,9 +352,9 @@ public abstract class TwoInputNode extends OptimizerNode {
                final ExecutionMode input1Mode = 
this.input1.getDataExchangeMode();
                final ExecutionMode input2Mode = 
this.input2.getDataExchangeMode();
 
-               final int dop = getDegreeOfParallelism();
-               final int inDop1 = 
getFirstPredecessorNode().getDegreeOfParallelism();
-               final int inDop2 = 
getSecondPredecessorNode().getDegreeOfParallelism();
+               final int dop = getParallelism();
+               final int inDop1 = getFirstPredecessorNode().getParallelism();
+               final int inDop2 = getSecondPredecessorNode().getParallelism();
 
                final boolean dopChange1 = dop != inDop1;
                final boolean dopChange2 = dop != inDop2;
@@ -720,7 +720,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 
        @Override
        public SemanticProperties getSemanticProperties() {
-               return getPactContract().getSemanticProperties();
+               return getOperator().getSemanticProperties();
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -737,7 +737,7 @@ public abstract class TwoInputNode extends OptimizerNode {
                        getFirstPredecessorNode().accept(visitor);
                        getSecondPredecessorNode().accept(visitor);
                        
-                       for (PactConnection connection : 
getBroadcastConnections()) {
+                       for (DagConnection connection : 
getBroadcastConnections()) {
                                connection.getSource().accept(visitor);
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 2ec36b1..71a49f3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.InterestingProperties;
@@ -80,9 +80,9 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
        
        private OptimizerNode nextWorkset;
        
-       private PactConnection solutionSetDeltaRootConnection;
+       private DagConnection solutionSetDeltaRootConnection;
        
-       private PactConnection nextWorksetRootConnection;
+       private DagConnection nextWorksetRootConnection;
        
        private SingleRootJoiner singleRoot;
        
@@ -122,7 +122,7 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
        // 
--------------------------------------------------------------------------------------------
        
        public DeltaIterationBase<?, ?> getIterationContract() {
-               return (DeltaIterationBase<?, ?>) getPactContract();
+               return (DeltaIterationBase<?, ?>) getOperator();
        }
        
        public SolutionSetNode getSolutionSetNode() {
@@ -167,9 +167,9 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                // if the next workset is equal to the workset, we need to 
inject a no-op node
                if (nextWorkset == worksetNode || nextWorkset instanceof 
BinaryUnionNode) {
                        NoOpNode noop = new NoOpNode();
-                       noop.setDegreeOfParallelism(getDegreeOfParallelism());
+                       noop.setDegreeOfParallelism(getParallelism());
 
-                       PactConnection noOpConn = new 
PactConnection(nextWorkset, noop, executionMode);
+                       DagConnection noOpConn = new DagConnection(nextWorkset, 
noop, executionMode);
                        noop.setIncomingConnection(noOpConn);
                        nextWorkset.addOutgoingConnection(noOpConn);
                        
@@ -179,9 +179,9 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                // attach an extra node to the solution set delta for the cases 
where we need to repartition
                UnaryOperatorNode solutionSetDeltaUpdateAux = new 
UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
                                new 
SolutionSetDeltaOperator(getSolutionSetKeyFields()));
-               
solutionSetDeltaUpdateAux.setDegreeOfParallelism(getDegreeOfParallelism());
+               
solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
 
-               PactConnection conn = new PactConnection(solutionSetDelta, 
solutionSetDeltaUpdateAux, executionMode);
+               DagConnection conn = new DagConnection(solutionSetDelta, 
solutionSetDeltaUpdateAux, executionMode);
                solutionSetDeltaUpdateAux.setIncomingConnection(conn);
                solutionSetDelta.addOutgoingConnection(conn);
                
@@ -189,10 +189,10 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                this.nextWorkset = nextWorkset;
                
                this.singleRoot = new SingleRootJoiner();
-               this.solutionSetDeltaRootConnection = new 
PactConnection(solutionSetDeltaUpdateAux,
+               this.solutionSetDeltaRootConnection = new 
DagConnection(solutionSetDeltaUpdateAux,
                                                                                
                        this.singleRoot, executionMode);
 
-               this.nextWorksetRootConnection = new 
PactConnection(nextWorkset, this.singleRoot, executionMode);
+               this.nextWorksetRootConnection = new DagConnection(nextWorkset, 
this.singleRoot, executionMode);
                this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, 
this.nextWorksetRootConnection);
                
                
solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection);
@@ -371,7 +371,7 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                                        UnaryOperatorNode 
rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset 
Properties",
                                                                                
                                                                                
                        FieldList.EMPTY_LIST);
                                        
-                                       
rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+                                       
rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
                                        
                                        SingleInputPlanNode 
rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
                                                                                
                rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
@@ -454,7 +454,7 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                                        }
                                        
                                        WorksetIterationPlanNode wsNode = new 
WorksetIterationPlanNode(this,
-                                                       "WorksetIteration 
("+this.getPactContract().getName()+")", solutionSetIn,
+                                                       "WorksetIteration 
("+this.getOperator().getName()+")", solutionSetIn,
                                                        worksetIn, sspn, wspn, 
worksetCandidate, solutionSetCandidate);
                                        
wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
                                        wsNode.initProperties(gp, lp);
@@ -566,7 +566,7 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                        setDegreeOfParallelism(1);
                }
                
-               public void setInputs(PactConnection input1, PactConnection 
input2) {
+               public void setInputs(DagConnection input1, DagConnection 
input2) {
                        this.input1 = input1;
                        this.input2 = input2;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
index bd39858..3b05aba 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
@@ -49,7 +49,7 @@ public class WorksetNode extends AbstractPartialSolutionNode {
                if (this.cachedPlans != null) {
                        throw new IllegalStateException();
                } else {
-                       WorksetPlanNode wspn = new WorksetPlanNode(this, 
"Workset ("+this.getPactContract().getName()+")", gProps, lProps, initialInput);
+                       WorksetPlanNode wspn = new WorksetPlanNode(this, 
"Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput);
                        this.cachedPlans = 
Collections.<PlanNode>singletonList(wspn);
                }
        }
@@ -79,8 +79,8 @@ public class WorksetNode extends AbstractPartialSolutionNode {
         * @return The contract.
         */
        @Override
-       public WorksetPlaceHolder<?> getPactContract() {
-               return (WorksetPlaceHolder<?>) super.getPactContract();
+       public WorksetPlaceHolder<?> getOperator() {
+               return (WorksetPlaceHolder<?>) super.getOperator();
        }
 
        @Override
@@ -94,7 +94,7 @@ public class WorksetNode extends AbstractPartialSolutionNode {
                        return;
                }
 
-               PactConnection worksetInput = 
this.iterationNode.getSecondIncomingConnection();
+               DagConnection worksetInput = 
this.iterationNode.getSecondIncomingConnection();
                OptimizerNode worksetSource = worksetInput.getSource();
                
                addClosedBranches(worksetSource.closedBranchingNodes);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index a547f04..57ba29d 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -185,11 +185,7 @@ public class GlobalProperties implements Cloneable {
        }
 
        public boolean isExactlyPartitionedOnFields(FieldList fields) {
-               if (this.partitioning.isPartitionedOnKey() && 
fields.isExactMatch(this.partitioningFields)) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return this.partitioning.isPartitionedOnKey() && 
fields.isExactMatch(this.partitioningFields);
        }
        
        public boolean matchesOrderedPartitioning(Ordering o) {

Reply via email to