http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
index ced0e83..6946641 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dataproperties;
 
 import java.util.HashSet;
@@ -29,13 +28,16 @@ import org.apache.flink.optimizer.dag.SingleInputNode;
 import org.apache.flink.optimizer.dag.TwoInputNode;
 
 /**
- * The interesting properties that a node in the optimizer plan hands to its 
predecessors. It has the
- * purpose to tell the preceding nodes, which data properties might have the 
advantage, because they would
- * let the node fulfill its pact cheaper. More on optimization with 
interesting properties can be found
- * in the works on the volcano- and cascades optimizer framework.
+ * Interesting properties are propagated from parent operators to child 
operators. They tell the child
+ * what data properties would help the parent in operating in a cheaper 
fashion. A reduce operator, for
+ * example, tells its child that partitioned data would help. If the child is 
a join operator, it can use
+ * that knowledge to favor strategies that leave the data in a partitioned 
form.
+ *
+ * More on optimization with interesting properties can be found in the works 
on
+ * the volcano- and cascades optimizer framework.
  */
-public class InterestingProperties implements Cloneable
-{
+public class InterestingProperties implements Cloneable  {
+
        private Set<RequestedGlobalProperties> globalProps; // the global 
properties, i.e. properties across partitions
 
        private Set<RequestedLocalProperties> localProps; // the local 
properties, i.e. properties within partitions
@@ -91,8 +93,7 @@ public class InterestingProperties implements Cloneable
                return this.globalProps;
        }
 
-       public InterestingProperties filterByCodeAnnotations(OptimizerNode 
node, int input)
-       {
+       public InterestingProperties filterByCodeAnnotations(OptimizerNode 
node, int input) {
                InterestingProperties iProps = new InterestingProperties();
                SemanticProperties props;
                if (node instanceof SingleInputNode || node instanceof 
TwoInputNode) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
index 0c3ea12..e0231aa 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This class represents local properties of the data. A local property is a 
property that exists
- * within the data of a single partition.
+ * within the data of a single partition, such as sort order, or data grouping.
  */
 public class LocalProperties implements Cloneable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
index 674cdb8..5e06dd3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
@@ -19,42 +19,45 @@
 package org.apache.flink.optimizer.dataproperties;
 
 /**
- * An enumeration tracking the different types of sharding strategies.
+ * An enumeration of the the different types of distributing data across 
partitions or
+ * parallel workers.
  */
 public enum PartitioningProperty {
 
        /**
-        * Any data distribution, i.e., random partitioning or full replication.
+        * Any possible way of data distribution, including random partitioning 
and full replication.
         */
        ANY_DISTRIBUTION,
 
        /**
-        * Constant indicating no particular partitioning (i.e. random) data 
distribution.
+        * A random disjunct (non-replicated) data distribution, where each 
datum is contained in one partition only.
+        * This is for example the result of parallel scans of data in a file 
system like HDFS,
+        * or the result of a round-robin data distribution.
         */
        RANDOM_PARTITIONED,
 
        /**
-        * Constant indicating a hash partitioning.
+        * A hash partitioning on a certain key.
         */
        HASH_PARTITIONED,
 
        /**
-        * Constant indicating a range partitioning.
+        * A range partitioning on a certain key.
         */
        RANGE_PARTITIONED,
 
        /**
-        * Constant indicating any not further specified disjunct partitioning.
+        * A not further specified partitioning on a key (hash-, or range 
partitioning, or some other scheme even).
         */
        ANY_PARTITIONING,
        
        /**
-        * Constant indicating full replication of the data to each parallel 
instance.
+        *Full replication of the data to each parallel instance.
         */
        FULL_REPLICATION,
 
        /**
-        * Constant indicating a forced even re-balancing.
+        * A forced even re-balancing. All partitions are guaranteed to have 
almost the same number of records.
         */
        FORCED_REBALANCED,
        
@@ -95,10 +98,10 @@ public enum PartitioningProperty {
 
        /**
         * Checks, if this property represents a partitioning that is 
computable.
-        * Computable partitionings can be recreated through an algorithm. If 
two sets of data are to
+        * A computable partitioning can be recreated through an algorithm. If 
two sets of data are to
         * be co-partitioned, it is crucial, that the partitioning schemes are 
computable.
         * <p>
-        * Examples for computable partitioning schemes are hash- or 
range-partitionings. An example for a non-computable
+        * Examples for computable partitioning schemes are hash- or 
range-partitioning. An example for a non-computable
         * partitioning is the implicit partitioning that exists though a 
globally unique key.
         * 
         * @return True, if this enum constant is a re-computable partitioning.

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
index 102a0f0..a9d14e8 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java
@@ -29,14 +29,16 @@ import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
 /**
- * This class represents local properties of the data. A local property is a 
property that exists
- * within the data of a single partition.
+ * This class represents the local properties of the data that are requested 
by an operator.
+ * Local properties are the properties within one partition.
+ * Operators request the local properties they need for correct execution. 
Here are some example local
+ * properties requested by certain operators:
+ * <ul>
+ *     <li>"groupBy/reduce" will request the data to be grouped on the key 
fields.</li>
+ *     <li>A sort-merge join will request the data from each input to be 
sorted on the respective join key.</li>
+ * </ul>
  */
 public class RequestedLocalProperties implements Cloneable {
-
-       public static final RequestedLocalProperties DEFAULT_PROPERTIES = null;
-       
-       // 
--------------------------------------------------------------------------------------------
        
        private Ordering ordering;                      // order inside a 
partition, null if not ordered
 
@@ -205,9 +207,9 @@ public class RequestedLocalProperties implements Cloneable {
        }
        
        /**
-        * Parameterizes the local strategy fields of a channel such that the 
channel produces the desired local properties.
+        * Parametrizes the local strategy fields of a channel such that the 
channel produces the desired local properties.
         * 
-        * @param channel The channel to parameterize.
+        * @param channel The channel to parametrize.
         */
        public void parameterizeChannel(Channel channel) {
                LocalProperties current = channel.getLocalProperties();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
index eaf11ac..9ee56b0 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java
@@ -40,7 +40,7 @@ public final class AllGroupCombineProperties extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "GroupCombine 
("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
+               return new SingleInputPlanNode(node, "GroupCombine 
("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
index 55d6fbb..9efd8c7 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java
@@ -40,7 +40,7 @@ public final class AllGroupReduceProperties extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "GroupReduce 
("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
+               return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
index 7d07e7d..b3c083a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
@@ -46,7 +46,7 @@ public final class AllGroupWithPartialPreGroupProperties 
extends OperatorDescrip
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
                if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
                        // locally connected, directly instantiate
-                       return new SingleInputPlanNode(node, "GroupReduce 
("+node.getPactContract().getName()+")",
+                       return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")",
                                                                                
        in, DriverStrategy.ALL_GROUP_REDUCE);
                } else {
                        // non forward case.plug in a combiner
@@ -55,10 +55,10 @@ public final class AllGroupWithPartialPreGroupProperties 
extends OperatorDescrip
                        
                        // create an input node for combine with same DOP as 
input node
                        GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
                        SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                       "Combine 
("+node.getPactContract().getName()+")", toCombiner, 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
+                                       "Combine 
("+node.getOperator().getName()+")", toCombiner, 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
                        combiner.setCosts(new Costs(0, 0));
                        
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
                        
@@ -67,7 +67,7 @@ public final class AllGroupWithPartialPreGroupProperties 
extends OperatorDescrip
                                                                                
in.getShipStrategySortOrder(), in.getDataExchangeMode());
 
                        toReducer.setLocalStrategy(in.getLocalStrategy(), 
in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-                       return new SingleInputPlanNode(node, "GroupReduce 
("+node.getPactContract().getName()+")",
+                       return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")",
                                                                                
        toReducer, DriverStrategy.ALL_GROUP_REDUCE);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
index 4f6a4fd..a172a60 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
@@ -45,7 +45,7 @@ public final class AllReduceProperties extends 
OperatorDescriptorSingle {
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
                if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
                        // locally connected, directly instantiate
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getPactContract().getName()+")",
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
                                                                                
        in, DriverStrategy.ALL_REDUCE);
                } else {
                        // non forward case.plug in a combiner
@@ -54,10 +54,10 @@ public final class AllReduceProperties extends 
OperatorDescriptorSingle {
                        
                        // create an input node for combine with same DOP as 
input node
                        ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
                        SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                       "Combine 
("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
+                                       "Combine 
("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
                        combiner.setCosts(new Costs(0, 0));
                        
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
                        
@@ -67,7 +67,7 @@ public final class AllReduceProperties extends 
OperatorDescriptorSingle {
                        toReducer.setLocalStrategy(in.getLocalStrategy(), 
in.getLocalStrategyKeys(),
                                                                                
in.getLocalStrategySortOrder());
 
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getPactContract().getName()+")",
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
                                                                                
        toReducer, DriverStrategy.ALL_REDUCE);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
index c46ce56..f48e297 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
@@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends 
OperatorDescriptorDual
        
        @Override
        public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               return new DualInputPlanNode(node, 
"Cross("+node.getPactContract().getName()+")", in1, in2, getStrategy());
+               return new DualInputPlanNode(node, 
"Cross("+node.getOperator().getName()+")", in1, in2, getStrategy());
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index a17063e..368944e 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -215,7 +215,7 @@ public class CoGroupDescriptor extends 
OperatorDescriptorDual {
                        inputOrders = tmp;
                }
                
-               return new DualInputPlanNode(node, "CoGroup 
("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, 
this.keys1, this.keys2, inputOrders);
+               return new DualInputPlanNode(node, "CoGroup 
("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, 
this.keys1, this.keys2, inputOrders);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
index ffae66f..8e7edeb 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -60,7 +60,7 @@ public class CoGroupWithSolutionSetFirstDescriptor extends 
CoGroupDescriptor {
                        inputOrders = tmp;
                }
 
-               return new DualInputPlanNode(node, "CoGroup 
("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, 
this.keys1, this.keys2, inputOrders);
+               return new DualInputPlanNode(node, "CoGroup 
("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, 
this.keys1, this.keys2, inputOrders);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
index 92f7474..bcd4d73 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
@@ -42,7 +42,7 @@ public class CollectorMapDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Map 
("+node.getPactContract().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
+               return new SingleInputPlanNode(node, "Map 
("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
index e7ff210..81c823f 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
@@ -41,7 +41,7 @@ public class FilterDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Filter 
("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP);
+               return new SingleInputPlanNode(node, "Filter 
("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
index 5384aa6..b915e45 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
@@ -42,7 +42,7 @@ public class FlatMapDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "FlatMap 
("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP);
+               return new SingleInputPlanNode(node, "FlatMap 
("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
index d0e839a..b648386 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -71,12 +71,12 @@ public final class GroupCombineProperties extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               
node.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+               node.setDegreeOfParallelism(in.getSource().getParallelism());
                
                // sorting key info
                SingleInputPlanNode singleInputPlanNode = new 
SingleInputPlanNode(
                                node, 
-                               "GroupCombine (" + 
node.getPactContract().getName() + ")",
+                               "GroupCombine (" + node.getOperator().getName() 
+ ")",
                                in, // reuse the combine strategy also used in 
the group reduce
                                DriverStrategy.SORTED_GROUP_COMBINE, 
this.keyList);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
index c66321d..ebd09f2 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
@@ -85,7 +85,7 @@ public final class GroupReduceProperties extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "GroupReduce 
("+node.getPactContract().getName()+")", in, 
DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+               return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, 
this.keyList);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index 1caee6c..c4f47d3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -98,7 +98,7 @@ public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSi
                                
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
                                                                        
in.getLocalStrategySortOrder());
                        }
-                       return new SingleInputPlanNode(node, 
"Reduce("+node.getPactContract().getName()+")", in,
+                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", in,
                                                                                
        DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
                } else {
                        // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
@@ -107,9 +107,9 @@ public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSi
 
                        // create an input node for combine with same DOP as 
input node
                        GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
-                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
+                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
                                        .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
                        combiner.setCosts(new Costs(0, 0));
                        
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
@@ -124,7 +124,7 @@ public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSi
                        toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),
                                                                                
in.getLocalStrategySortOrder());
 
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getPactContract().getName()+")",
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
                                                                                
        toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
index a6c4500..fec72a9 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
@@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends 
AbstractJoinDescriptor {
                else {
                        strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
                }
-               return new DualInputPlanNode(node, 
"Join("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
+               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
index 79cb3cb..f9d1e6c 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
@@ -78,7 +78,7 @@ public final class HashJoinBuildSecondProperties extends 
AbstractJoinDescriptor
                else {
                        strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
                }
-               return new DualInputPlanNode(node, "Join 
("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
+               return new DualInputPlanNode(node, "Join 
("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
index e55a728..9f14d2a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
@@ -41,7 +41,7 @@ public class MapDescriptor extends OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Map 
("+node.getPactContract().getName()+")", in, DriverStrategy.MAP);
+               return new SingleInputPlanNode(node, "Map 
("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
index 8cef12d..1489097 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
@@ -41,7 +41,7 @@ public class MapPartitionDescriptor extends 
OperatorDescriptorSingle {
 
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "MapPartition 
("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION);
+               return new SingleInputPlanNode(node, "MapPartition 
("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
index fe9302a..2bde29b 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -48,10 +48,10 @@ public final class PartialGroupProperties extends 
OperatorDescriptorSingle {
        @Override
        public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
                // create in input node for combine with same DOP as input node
-               GroupReduceNode combinerNode = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
-               
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+               GroupReduceNode combinerNode = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
+               
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
-               SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, 
"Combine("+node.getPactContract().getName()+")", in,
+               SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", 
in,
                                DriverStrategy.SORTED_GROUP_COMBINE);
                // sorting key info
                combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index 01096f0..5bb51f3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -61,7 +61,7 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
                if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
                                (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
                {
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getPactContract().getName()+")", in,
+                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
                                                                                
        DriverStrategy.SORTED_REDUCE, this.keyList);
                }
                else {
@@ -71,10 +71,10 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
                        
                        // create an input node for combine with same DOP as 
input node
                        ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
+                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
 
                        SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                                               "Combine 
("+node.getPactContract().getName()+")", toCombiner,
+                                                               "Combine 
("+node.getOperator().getName()+")", toCombiner,
                                                                
DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
 
                        combiner.setCosts(new Costs(0, 0));
@@ -85,7 +85,7 @@ public final class ReduceProperties extends 
OperatorDescriptorSingle {
                                                                                
in.getShipStrategySortOrder(), in.getDataExchangeMode());
                        toReducer.setLocalStrategy(LocalStrategy.SORT, 
in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
 
-                       return new SingleInputPlanNode(node, 
"Reduce("+node.getPactContract().getName()+")", toReducer,
+                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", toReducer,
                                                                                
        DriverStrategy.SORTED_REDUCE, this.keyList);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
index afe7e8d..356836a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
@@ -99,7 +99,7 @@ public class SortMergeJoinDescriptor extends 
AbstractJoinDescriptor {
                        inputOrders = tmp;
                }
                
-               return new DualInputPlanNode(node, 
"Join("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.MERGE, 
this.keys1, this.keys2, inputOrders);
+               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, 
this.keys1, this.keys2, inputOrders);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
index 093e968..bf22fb3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
@@ -42,7 +42,7 @@ public class BinaryUnionPlanNode extends DualInputPlanNode {
                this.nodeCosts = toSwapFrom.nodeCosts;
                this.cumulativeCosts = toSwapFrom.cumulativeCosts;
                
-               setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism());
+               setParallelism(toSwapFrom.getParallelism());
        }
        
        public BinaryUnionNode getOptimizerNode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 8a2398c..875d1c3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -37,7 +37,30 @@ import 
org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
 /**
- * A Channel is a data exchange between two operators.
+ * A Channel represents the result produced by an operator and the data 
exchange
+ * before the consumption by the target operator.
+ *
+ * The channel defines and tracks various properties and characteristics of the
+ * data set and data exchange.
+ *
+ * Data set characteristics:
+ * <ul>
+ *     <li>The "global properties" of the data, i.e., how the data is 
distributed across
+ *         partitions</li>
+ *     <li>The "required global properties" of the data, i.e., the global 
properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ *     <li>The "local properties" of the data, i.e., how the data is organized 
within a partition</li>
+ *     <li>The "required local properties" of the data, i.e., the local 
properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ * </ul>
+ *
+ * Data exchange parameters:
+ * <ul>
+ *     <li>The "ship strategy", i.e., whether to forward the data, shuffle it, 
broadcast it, ...</li>
+ *     <li>The "ship keys", which are the positions of the key fields in the 
exchanged records.</li>
+ *     <li>The "data exchange mode", which defines whether to pipeline or 
batch the exchange</li>
+ *     <li>Several more...</li>
+ * </ul>
  */
 public class Channel implements EstimateProvider, Cloneable, 
DumpableConnection<PlanNode> {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
index eea9b67..01c56dd 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
@@ -81,10 +81,10 @@ public class DualInputPlanNode extends PlanNode {
                this.sortOrders = driverSortOrders;
                
                if (this.input1.getShipStrategy() == 
ShipStrategyType.BROADCAST) {
-                       
this.input1.setReplicationFactor(getDegreeOfParallelism());
+                       this.input1.setReplicationFactor(getParallelism());
                }
                if (this.input2.getShipStrategy() == 
ShipStrategyType.BROADCAST) {
-                       
this.input2.setReplicationFactor(getDegreeOfParallelism());
+                       this.input2.setReplicationFactor(getParallelism());
                }
                
                mergeBranchPlanMaps(input1.getSource(), input2.getSource());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index e565909..6f634fb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.plan;
 
 import java.util.ArrayList;
@@ -73,7 +72,7 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
        
        private double relativeMemoryPerSubTask;                                
        // the amount of memory dedicated to each task, in bytes
        
-       private int degreeOfParallelism;
+       private int parallelism;
        
        private boolean pFlag;                                                  
// flag for the internal pruning algorithm
        
@@ -86,7 +85,7 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
                this.nodeName = nodeName;
                this.driverStrategy = strategy;
                
-               this.degreeOfParallelism = template.getDegreeOfParallelism();
+               this.parallelism = template.getParallelism();
 
                // check, if there is branch at this node. if yes, this 
candidate must be associated with
                // the branching template node.
@@ -134,21 +133,21 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
        // 
--------------------------------------------------------------------------------------------
        
        /**
-        * Gets the optimizer's pact node for which this plan candidate node 
was created.
+        * Gets the node from the optimizer DAG for which this plan candidate 
node was created.
         * 
-        * @return The template optimizer's node.
+        * @return The optimizer's DAG node.
         */
        public OptimizerNode getOriginalOptimizerNode() {
                return this.template;
        }
        
        /**
-        * Gets the pact contract this node represents in the plan.
+        * Gets the program operator that this node represents in the plan.
         * 
-        * @return The pact contract this node represents in the plan.
+        * @return The program operator this node represents in the plan.
         */
-       public Operator<?> getPactContract() {
-               return this.template.getPactContract();
+       public Operator<?> getProgramOperator() {
+               return this.template.getOperator();
        }
        
        /**
@@ -252,7 +251,7 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
                        return null;
                } else {
                        Costs result = cumulativeCosts.clone();
-                       if (this.template != null && 
this.template.getOutgoingConnections() != null) {
+                       if (this.template.getOutgoingConnections() != null) {
                                int outDegree = 
this.template.getOutgoingConnections().size();
                                if (outDegree > 0) {
                                        result.divideBy(outDegree);
@@ -302,12 +301,12 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
                }
        }
        
-       public void setDegreeOfParallelism(int parallelism) {
-               this.degreeOfParallelism = parallelism;
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
        }
        
-       public int getDegreeOfParallelism() {
-               return this.degreeOfParallelism;
+       public int getParallelism() {
+               return this.parallelism;
        }
        
        public long getGuaranteedAvailableMemory() {
@@ -514,7 +513,7 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
 
        @Override
        public String toString() {
-               return this.template.getName() + " \"" + 
getPactContract().getName() + "\" : " + this.driverStrategy +
+               return this.template.getName() + " \"" + 
getProgramOperator().getName() + "\" : " + this.driverStrategy +
                                " [[ " + this.globalProps + " ]] [[ " + 
this.localProps + " ]]";
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
index cefd704..b928be7 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
@@ -81,7 +81,7 @@ public class SingleInputPlanNode extends PlanNode {
                }
                
                if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) 
{
-                       
this.input.setReplicationFactor(getDegreeOfParallelism());
+                       this.input.setReplicationFactor(getParallelism());
                }
                
                final PlanNode predNode = input.getSource();

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 4083a2a..6f918c0 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -39,7 +39,7 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.dag.DataSourceNode;
 import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PactConnection;
+import org.apache.flink.optimizer.dag.DagConnection;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.dag.WorksetIterationNode;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -231,18 +231,18 @@ public class PlanJSONDumpGenerator {
                String contents;
                if (n instanceof DataSinkNode) {
                        type = "sink";
-                       contents = n.getPactContract().toString();
+                       contents = n.getOperator().toString();
                } else if (n instanceof DataSourceNode) {
                        type = "source";
-                       contents = n.getPactContract().toString();
+                       contents = n.getOperator().toString();
                }
                else if (n instanceof BulkIterationNode) {
                        type = "bulk_iteration";
-                       contents = n.getPactContract().getName();
+                       contents = n.getOperator().getName();
                }
                else if (n instanceof WorksetIterationNode) {
                        type = "workset_iteration";
-                       contents = n.getPactContract().getName();
+                       contents = n.getOperator().getName();
                }
                else if (n instanceof BinaryUnionNode) {
                        type = "pact";
@@ -250,7 +250,7 @@ public class PlanJSONDumpGenerator {
                }
                else {
                        type = "pact";
-                       contents = n.getPactContract().getName();
+                       contents = n.getOperator().getName();
                }
                
                contents = StringUtils.showControlCharacters(contents);
@@ -277,7 +277,7 @@ public class PlanJSONDumpGenerator {
 
                // degree of parallelism
                writer.print(",\n\t\t\"parallelism\": \""
-                       + (n.getDegreeOfParallelism() >= 1 ? 
n.getDegreeOfParallelism() : "default") + "\"");
+                       + (n.getParallelism() >= 1 ? n.getParallelism() : 
"default") + "\"");
                
                // output node predecessors
                Iterator<? extends DumpableConnection<?>> inConns = 
node.getDumpableInputs().iterator();
@@ -294,10 +294,10 @@ public class PlanJSONDumpGenerator {
                                writer.print(inputNum == 0 ? "\n" : ",\n");
                                if (inputNum == 0) {
                                        child1name += child1name.length() > 0 ? 
", " : ""; 
-                                       child1name += 
source.getOptimizerNode().getPactContract().getName();
+                                       child1name += 
source.getOptimizerNode().getOperator().getName();
                                } else if (inputNum == 1) {
                                        child2name += child2name.length() > 0 ? 
", " : ""; 
-                                       child2name = 
source.getOptimizerNode().getPactContract().getName();
+                                       child2name = 
source.getOptimizerNode().getOperator().getName();
                                }
 
                                // output predecessor id
@@ -310,7 +310,7 @@ public class PlanJSONDumpGenerator {
                                // output shipping strategy and channel type
                                final Channel channel = (inConn instanceof 
Channel) ? (Channel) inConn : null; 
                                final ShipStrategyType shipType = channel != 
null ? channel.getShipStrategy() :
-                                               ((PactConnection) 
inConn).getShipStrategy();
+                                               ((DagConnection) 
inConn).getShipStrategy();
                                        
                                String shipStrategy = null;
                                if (shipType != null) {
@@ -588,8 +588,8 @@ public class PlanJSONDumpGenerator {
                }
 
                // output the node compiler hints
-               if (n.getPactContract().getCompilerHints() != null) {
-                       CompilerHints hints = 
n.getPactContract().getCompilerHints();
+               if (n.getOperator().getCompilerHints() != null) {
+                       CompilerHints hints = 
n.getOperator().getCompilerHints();
                        CompilerHints defaults = new CompilerHints();
 
                        String size = hints.getOutputSize() == 
defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());

Reply via email to