http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
deleted file mode 100644
index d77fe1e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/PactCompiler.java
+++ /dev/null
@@ -1,1372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.optimizer.dag.GroupCombineNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.optimizer.dag.SortPartitionNode;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import 
org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
-import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
-import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
-import org.apache.flink.optimizer.dag.CoGroupNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
-import org.apache.flink.optimizer.dag.CrossNode;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.FilterNode;
-import org.apache.flink.optimizer.dag.FlatMapNode;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.MapPartitionNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PactConnection;
-import org.apache.flink.optimizer.dag.PartitionNode;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.SolutionSetNode;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.dag.WorksetNode;
-import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.optimizer.postpass.OptimizerPostPass;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Visitor;
-
-/**
- * The optimizer that takes the user specified program plan and creates an 
optimized plan that contains
- * exact descriptions about how the physical execution will take place. It 
first translates the user
- * program into an internal optimizer representation and then chooses between 
different alternatives
- * for shipping strategies and local strategies.
- * <p>
- * The basic principle is taken from optimizer works in systems such as 
Volcano/Cascades and Selinger/System-R/DB2. The
- * optimizer walks from the sinks down, generating interesting properties, and 
ascends from the sources generating
- * alternative plans, pruning against the interesting properties.
- * <p>
- * The optimizer also assigns the memory to the individual tasks. This is 
currently done in a very simple fashion: All
- * sub-tasks that need memory (e.g. reduce or join) are given an equal share 
of memory.
- */
-public class PactCompiler {
-
-       // 
------------------------------------------------------------------------
-       // Constants
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Compiler hint key for the input channel's shipping strategy. This 
String is a key to the operator's stub
-        * parameters. The corresponding value tells the compiler which 
shipping strategy to use for the input channel.
-        * If the operator has two input channels, the shipping strategy is 
applied to both input channels.
-        */
-       public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
-
-       /**
-        * Compiler hint key for the <b>first</b> input channel's shipping 
strategy. This String is a key to
-        * the operator's stub parameters. The corresponding value tells the 
compiler which shipping strategy
-        * to use for the <b>first</b> input channel. Only applicable to 
operators with two inputs.
-        */
-       public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = 
"INPUT_LEFT_SHIP_STRATEGY";
-
-       /**
-        * Compiler hint key for the <b>second</b> input channel's shipping 
strategy. This String is a key to
-        * the operator's stub parameters. The corresponding value tells the 
compiler which shipping strategy
-        * to use for the <b>second</b> input channel. Only applicable to 
operators with two inputs.
-        */
-       public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = 
"INPUT_RIGHT_SHIP_STRATEGY";
-
-       /**
-        * Value for the shipping strategy compiler hint that enforces a 
<b>Forward</b> strategy on the
-        * input channel, i.e. no redistribution of any kind.
-        * 
-        * @see #HINT_SHIP_STRATEGY
-        * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-        * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-        */
-       public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
-       
-       /**
-        * Value for the shipping strategy compiler hint that enforces a random 
repartition strategy.
-        * 
-        * @see #HINT_SHIP_STRATEGY
-        * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-        * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-        */
-       public static final String HINT_SHIP_STRATEGY_REPARTITION= 
"SHIP_REPARTITION";
-       
-       /**
-        * Value for the shipping strategy compiler hint that enforces a 
hash-partition strategy.
-        * 
-        * @see #HINT_SHIP_STRATEGY
-        * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-        * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-        */
-       public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = 
"SHIP_REPARTITION_HASH";
-       
-       /**
-        * Value for the shipping strategy compiler hint that enforces a 
range-partition strategy.
-        * 
-        * @see #HINT_SHIP_STRATEGY
-        * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-        * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-        */
-       public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = 
"SHIP_REPARTITION_RANGE";
-
-       /**
-        * Value for the shipping strategy compiler hint that enforces a 
<b>broadcast</b> strategy on the
-        * input channel.
-        * 
-        * @see #HINT_SHIP_STRATEGY
-        * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
-        * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
-        */
-       public static final String HINT_SHIP_STRATEGY_BROADCAST = 
"SHIP_BROADCAST";
-
-       /**
-        * Compiler hint key for the operator's local strategy. This String is 
a key to the operator's stub
-        * parameters. The corresponding value tells the compiler which local 
strategy to use to process the
-        * data inside one partition.
-        * <p>
-        * This hint is ignored by operators that do not have a local strategy 
(such as <i>Map</i>), or by operators that
-        * have no choice in their local strategy (such as <i>Cross</i>).
-        */
-       public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
-
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>sort 
based</b> local strategy.
-        * For example, a <i>Reduce</i> operator will sort the data to group it.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_SORT = 
"LOCAL_STRATEGY_SORT";
-       
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>sort 
based</b> local strategy.
-        * During sorting a combine method is repeatedly applied to reduce the 
data volume.
-        * For example, a <i>Reduce</i> operator will sort the data to group it.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = 
"LOCAL_STRATEGY_COMBINING_SORT";
-       
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>sort 
merge based</b> local strategy on both
-        * inputs with subsequent merging of inputs. 
-        * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a 
sort-merge strategy to find pairs 
-        * of matching keys.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = 
"LOCAL_STRATEGY_SORT_BOTH_MERGE";
-       
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>sort 
merge based</b> local strategy.
-        * The first input is sorted, the second input is assumed to be sorted. 
After sorting both inputs are merged.
-        * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a 
sort-merge strategy to find pairs 
-        * of matching keys.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = 
"LOCAL_STRATEGY_SORT_FIRST_MERGE";
-       
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>sort 
merge based</b> local strategy.
-        * The second input is sorted, the first input is assumed to be sorted. 
After sorting both inputs are merged.
-        * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a 
sort-merge strategy to find pairs 
-        * of matching keys.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = 
"LOCAL_STRATEGY_SORT_SECOND_MERGE";
-       
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>merge 
based</b> local strategy.
-        * Both inputs are assumed to be sorted and are merged. 
-        * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a 
merge strategy to find pairs 
-        * of matching keys.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_MERGE = 
"LOCAL_STRATEGY_MERGE";
-
-       
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>hash 
based</b> local strategy.
-        * For example, a <i>Match</i> operator will use a hybrid-hash-join 
strategy to find pairs of
-        * matching keys. The <b>first</b> input will be used to build the hash 
table, the second input will be
-        * used to probe the table.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = 
"LOCAL_STRATEGY_HASH_BUILD_FIRST";
-
-       /**
-        * Value for the local strategy compiler hint that enforces a <b>hash 
based</b> local strategy.
-        * For example, a <i>Match</i> operator will use a hybrid-hash-join 
strategy to find pairs of
-        * matching keys. The <b>second</b> input will be used to build the 
hash table, the first input will be
-        * used to probe the table.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = 
"LOCAL_STRATEGY_HASH_BUILD_SECOND";
-
-       /**
-        * Value for the local strategy compiler hint that chooses the outer 
side of the <b>nested-loop</b> local strategy.
-        * A <i>Cross</i> operator will process the data of the <b>first</b> 
input in the outer-loop of the nested loops.
-        * Hence, the data of the first input will be is streamed though, while 
the data of the second input is stored on
-        * disk
-        * and repeatedly read.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String 
HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = 
"LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
-
-       /**
-        * Value for the local strategy compiler hint that chooses the outer 
side of the <b>nested-loop</b> local strategy.
-        * A <i>Cross</i> operator will process the data of the <b>second</b> 
input in the outer-loop of the nested loops.
-        * Hence, the data of the second input will be is streamed though, 
while the data of the first input is stored on
-        * disk
-        * and repeatedly read.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String 
HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = 
"LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
-
-       /**
-        * Value for the local strategy compiler hint that chooses the outer 
side of the <b>nested-loop</b> local strategy.
-        * A <i>Cross</i> operator will process the data of the <b>first</b> 
input in the outer-loop of the nested loops.
-        * Further more, the first input, being the outer side, will be 
processed in blocks, and for each block, the second
-        * input,
-        * being the inner side, will read repeatedly from disk.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String 
HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = 
"LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
-
-       /**
-        * Value for the local strategy compiler hint that chooses the outer 
side of the <b>nested-loop</b> local strategy.
-        * A <i>Cross</i> operator will process the data of the <b>second</b> 
input in the outer-loop of the nested loops.
-        * Further more, the second input, being the outer side, will be 
processed in blocks, and for each block, the first
-        * input,
-        * being the inner side, will read repeatedly from disk.
-        * 
-        * @see #HINT_LOCAL_STRATEGY
-        */
-       public static final String 
HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = 
"LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
-       
-       /**
-        * The log handle that is used by the compiler to log messages.
-        */
-       public static final Logger LOG = 
LoggerFactory.getLogger(PactCompiler.class);
-
-       // 
------------------------------------------------------------------------
-       // Members
-       // 
------------------------------------------------------------------------
-
-       /**
-        * The statistics object used to obtain statistics, such as input sizes,
-        * for the cost estimation process.
-        */
-       private final DataStatistics statistics;
-
-       /**
-        * The cost estimator used by the compiler.
-        */
-       private final CostEstimator costEstimator;
-
-       /**
-        * The default degree of parallelism for jobs compiled by this compiler.
-        */
-       private int defaultDegreeOfParallelism;
-
-
-       // 
------------------------------------------------------------------------
-       // Constructor & Setup
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new optimizer instance. The optimizer has no access to 
statistics about the
-        * inputs and can hence not determine any properties. It will perform 
all optimization with
-        * unknown sizes and hence use only the heuristic cost functions, which 
result in the selection
-        * of the most robust execution strategies.
-        */
-       public PactCompiler() {
-               this(null, new DefaultCostEstimator());
-       }
-
-       /**
-        * Creates a new optimizer instance that uses the statistics object to 
determine properties about the input.
-        * Given those statistics, the optimizer can make better choices for 
the execution strategies.
-        * 
-        * @param stats
-        *        The statistics to be used to determine the input properties.
-        */
-       public PactCompiler(DataStatistics stats) {
-               this(stats, new DefaultCostEstimator());
-       }
-
-       /**
-        * Creates a new optimizer instance. The optimizer has no access to 
statistics about the
-        * inputs and can hence not determine any properties. It will perform 
all optimization with
-        * unknown sizes and hence use only the heuristic cost functions, which 
result in the selection
-        * of the most robust execution strategies.
-        *
-        * The optimizer uses the given cost estimator to compute the costs of 
the individual operations.
-        * 
-        * @param estimator The cost estimator to use to cost the individual 
operations.
-        */
-       public PactCompiler(CostEstimator estimator) {
-               this(null, estimator);
-       }
-
-       /**
-        * Creates a new optimizer instance that uses the statistics object to 
determine properties about the input.
-        * Given those statistics, the optimizer can make better choices for 
the execution strategies.
-        *
-        * The optimizer uses the given cost estimator to compute the costs of 
the individual operations.
-        * 
-        * @param stats
-        *        The statistics to be used to determine the input properties.
-        * @param estimator
-        *        The <tt>CostEstimator</tt> to use to cost the individual 
operations.
-        */
-       public PactCompiler(DataStatistics stats, CostEstimator estimator) {
-               this.statistics = stats;
-               this.costEstimator = estimator;
-
-               // determine the default parallelism
-               this.defaultDegreeOfParallelism = 
GlobalConfiguration.getInteger(
-                               
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-                               ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
-               
-               if (defaultDegreeOfParallelism < 1) {
-                       LOG.warn("Config value " + defaultDegreeOfParallelism + 
" for option "
-                                       + 
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and 
using a value of 1.");
-                       this.defaultDegreeOfParallelism = 1;
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //                             Getters / Setters
-       // 
------------------------------------------------------------------------
-       
-       public int getDefaultDegreeOfParallelism() {
-               return defaultDegreeOfParallelism;
-       }
-       
-       public void setDefaultDegreeOfParallelism(int 
defaultDegreeOfParallelism) {
-               if (defaultDegreeOfParallelism > 0) {
-                       this.defaultDegreeOfParallelism = 
defaultDegreeOfParallelism;
-               } else {
-                       throw new IllegalArgumentException("Default parallelism 
cannot be zero or negative.");
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       //                               Compilation
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Translates the given program to an OptimizedPlan, where all nodes 
have their local strategy assigned
-        * and all channels have a shipping strategy assigned.
-        *
-        * For more details on the optimization phase, see the comments for
-        * {@link #compile(org.apache.flink.api.common.Plan, 
org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
-        * 
-        * @param program The program to be translated.
-        * @return The optimized plan.
-        *
-        * @throws CompilerException
-        *         Thrown, if the plan is invalid or the optimizer encountered 
an inconsistent
-        *         situation during the compilation process.
-        */
-       public OptimizedPlan compile(Plan program) throws CompilerException {
-               final OptimizerPostPass postPasser = 
getPostPassFromPlan(program);
-               return compile(program, postPasser);
-       }
-
-       /**
-        * Translates the given program to an OptimizedPlan. The optimized plan 
describes for each operator
-        * which strategy to use (such as hash join versus sort-merge join), 
what data exchange method to use
-        * (local pipe forward, shuffle, broadcast), what exchange mode to use 
(pipelined, batch),
-        * where to cache intermediate results, etc,
-        *
-        * The optimization happens in multiple phases:
-        * <ol>
-        *     <li>Create optimizer dag implementation of the program.
-        *
-        *     <tt>OptimizerNode</tt> representations of the PACTs, assign 
parallelism and compute size estimates.</li>
-        * <li>Compute interesting properties and auxiliary structures.</li>
-        * <li>Enumerate plan alternatives. This cannot be done in the same 
step as the interesting property computation (as
-        * opposed to the Database approaches), because we support plans that 
are not trees.</li>
-        * </ol>
-        * 
-        * @param program The program to be translated.
-        * @param postPasser The function to be used for post passing the 
optimizer's plan and setting the
-        *                   data type specific serialization routines.
-        * @return The optimized plan.
-        * 
-        * @throws CompilerException
-        *         Thrown, if the plan is invalid or the optimizer encountered 
an inconsistent
-        *         situation during the compilation process.
-        */
-       private OptimizedPlan compile(Plan program, OptimizerPostPass 
postPasser) throws CompilerException {
-               if (program == null || postPasser == null) {
-                       throw new NullPointerException();
-               }
-               
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Beginning compilation of program '" + 
program.getJobName() + '\'');
-               }
-
-               final ExecutionMode defaultDataExchangeMode = 
program.getExecutionConfig().getExecutionMode();
-
-               final int defaultParallelism = program.getDefaultParallelism() 
> 0 ?
-                       program.getDefaultParallelism() : 
this.defaultDegreeOfParallelism;
-
-               // log the default settings
-               LOG.debug("Using a default parallelism of {}",  
defaultParallelism);
-               LOG.debug("Using default data exchange mode {}", 
defaultDataExchangeMode);
-
-               // the first step in the compilation is to create the optimizer 
plan representation
-               // this step does the following:
-               // 1) It creates an optimizer plan node for each operator
-               // 2) It connects them via channels
-               // 3) It looks for hints about local strategies and channel 
types and
-               // sets the types and strategies accordingly
-               // 4) It makes estimates about the data volume of the data 
sources and
-               // propagates those estimates through the plan
-
-               GraphCreatingVisitor graphCreator = new 
GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
-               program.accept(graphCreator);
-
-               // if we have a plan with multiple data sinks, add logical 
optimizer nodes that have two data-sinks as children
-               // each until we have only a single root node. This allows to 
transparently deal with the nodes with
-               // multiple outputs
-               OptimizerNode rootNode;
-               if (graphCreator.sinks.size() == 1) {
-                       rootNode = graphCreator.sinks.get(0);
-               } else if (graphCreator.sinks.size() > 1) {
-                       Iterator<DataSinkNode> iter = 
graphCreator.sinks.iterator();
-                       rootNode = iter.next();
-
-                       while (iter.hasNext()) {
-                               rootNode = new SinkJoiner(rootNode, 
iter.next());
-                       }
-               } else {
-                       throw new CompilerException("Bug: The optimizer plan 
representation has no sinks.");
-               }
-
-               // now that we have all nodes created and recorded which ones 
consume memory, tell the nodes their minimal
-               // guaranteed memory, for further cost estimations. we assume 
an equal distribution of memory among consumer tasks
-               rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
-
-               // We are dealing with operator DAGs, rather than operator 
trees.
-               // That requires us to deviate at some points from the 
classical DB optimizer algorithms.
-               // This step build some auxiliary structures to help track 
branches and joins in the DAG
-               BranchesVisitor branchingVisitor = new BranchesVisitor();
-               rootNode.accept(branchingVisitor);
-
-               // Propagate the interesting properties top-down through the 
graph
-               InterestingPropertyVisitor propsVisitor = new 
InterestingPropertyVisitor(this.costEstimator);
-               rootNode.accept(propsVisitor);
-               
-               // perform a sanity check: the root may not have any unclosed 
branches
-               if (rootNode.getOpenBranches() != null && 
rootNode.getOpenBranches().size() > 0) {
-                       throw new CompilerException("Bug: Logic for branching 
plans (non-tree plans) has an error, and does not " +
-                                       "track the re-joining of branches 
correctly.");
-               }
-
-               // the final step is now to generate the actual plan 
alternatives
-               List<PlanNode> bestPlan = 
rootNode.getAlternativePlans(this.costEstimator);
-
-               if (bestPlan.size() != 1) {
-                       throw new CompilerException("Error in compiler: more 
than one best plan was created!");
-               }
-
-               // check if the best plan's root is a data sink (single sink 
plan)
-               // if so, directly take it. if it is a sink joiner node, get 
its contained sinks
-               PlanNode bestPlanRoot = bestPlan.get(0);
-               List<SinkPlanNode> bestPlanSinks = new 
ArrayList<SinkPlanNode>(4);
-
-               if (bestPlanRoot instanceof SinkPlanNode) {
-                       bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
-               } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
-                       ((SinkJoinerPlanNode) 
bestPlanRoot).getDataSinks(bestPlanSinks);
-               }
-               
-               DeadlockPreventer dp = new DeadlockPreventer();
-               dp.resolveDeadlocks(bestPlanSinks);
-
-               // finalize the plan
-               OptimizedPlan plan = new 
PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
-               
-               plan.accept(new BinaryUnionReplacer());
-               
-               // post pass the plan. this is the phase where the 
serialization and comparator code is set
-               postPasser.postPass(plan);
-               
-               return plan;
-       }
-
-       /**
-        * This function performs only the first step to the compilation 
process - the creation of the optimizer
-        * representation of the plan. No estimations or enumerations of 
alternatives are done here.
-        * 
-        * @param program The plan to generate the optimizer representation for.
-        * @return The optimizer representation of the plan, as a collection of 
all data sinks
-        *         from the plan can be traversed.
-        */
-       public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
-               GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, 
null);
-               program.accept(graphCreator);
-               return graphCreator.sinks;
-       }
-       
-       // 
------------------------------------------------------------------------
-       //                 Visitors for Compilation Traversals
-       // 
------------------------------------------------------------------------
-       
-       /**
-        * This utility class performs the translation from the user specified 
program to the optimizer plan.
-        * It works as a visitor that walks the user's job in a depth-first 
fashion. During the descend, it creates
-        * an optimizer node for each operator, respectively data source or 
-sink. During the ascend, it connects
-        * the nodes to the full graph.
-        * <p>
-        * This translator relies on the <code>setInputs</code> method in the 
nodes. As that method implements the size
-        * estimation and the awareness for optimizer hints, the sizes will be 
properly estimated and the translated plan
-        * already respects all optimizer hints.
-        */
-       public static final class GraphCreatingVisitor implements 
Visitor<Operator<?>> {
-               
-               private final Map<Operator<?>, OptimizerNode> con2node; // map 
from the operator objects to their
-                                                                               
                                                // corresponding optimizer nodes
-
-               private final List<DataSinkNode> sinks; // all data sink nodes 
in the optimizer plan
-
-               private final int defaultParallelism; // the default degree of 
parallelism
-               
-               private final GraphCreatingVisitor parent;      // reference to 
enclosing creator, in case of a recursive translation
-
-               private final ExecutionMode defaultDataExchangeMode;
-
-               private final boolean forceDOP;
-
-               
-               public GraphCreatingVisitor(int defaultParallelism, 
ExecutionMode defaultDataExchangeMode) {
-                       this(null, false, defaultParallelism, 
defaultDataExchangeMode, null);
-               }
-
-               private GraphCreatingVisitor(GraphCreatingVisitor parent, 
boolean forceDOP, int defaultParallelism,
-                                                                       
ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
-                       if (closure == null){
-                               con2node = new HashMap<Operator<?>, 
OptimizerNode>();
-                       } else {
-                               con2node = closure;
-                       }
-
-                       this.sinks = new ArrayList<DataSinkNode>(2);
-                       this.defaultParallelism = defaultParallelism;
-                       this.parent = parent;
-                       this.defaultDataExchangeMode = dataExchangeMode;
-                       this.forceDOP = forceDOP;
-               }
-
-               public List<DataSinkNode> getSinks() {
-                       return sinks;
-               }
-
-               @SuppressWarnings("deprecation")
-               @Override
-               public boolean preVisit(Operator<?> c) {
-                       // check if we have been here before
-                       if (this.con2node.containsKey(c)) {
-                               return false;
-                       }
-
-                       final OptimizerNode n;
-
-                       // create a node for the operator (or sink or source) 
if we have not been here before
-                       if (c instanceof GenericDataSinkBase) {
-                               DataSinkNode dsn = new 
DataSinkNode((GenericDataSinkBase<?>) c);
-                               this.sinks.add(dsn);
-                               n = dsn;
-                       }
-                       else if (c instanceof GenericDataSourceBase) {
-                               n = new 
DataSourceNode((GenericDataSourceBase<?, ?>) c);
-                       }
-                       else if (c instanceof MapOperatorBase) {
-                               n = new MapNode((MapOperatorBase<?, ?, ?>) c);
-                       }
-                       else if (c instanceof MapPartitionOperatorBase) {
-                               n = new 
MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
-                       }
-                       else if (c instanceof 
org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
-                               n = new 
CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?,
 ?, ?>) c);
-                       }
-                       else if (c instanceof FlatMapOperatorBase) {
-                               n = new FlatMapNode((FlatMapOperatorBase<?, ?, 
?>) c);
-                       }
-                       else if (c instanceof FilterOperatorBase) {
-                               n = new FilterNode((FilterOperatorBase<?, ?>) 
c);
-                       }
-                       else if (c instanceof ReduceOperatorBase) {
-                               n = new ReduceNode((ReduceOperatorBase<?, ?>) 
c);
-                       }
-                       else if (c instanceof GroupReduceOperatorBase) {
-                               n = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
-                       }
-                       else if (c instanceof GroupCombineOperatorBase) {
-                               n = new 
GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
-                       }
-                       else if (c instanceof JoinOperatorBase) {
-                               n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) 
c);
-                       }
-                       else if (c instanceof CoGroupOperatorBase) {
-                               n = new CoGroupNode((CoGroupOperatorBase<?, ?, 
?, ?>) c);
-                       }
-                       else if (c instanceof CrossOperatorBase) {
-                               n = new CrossNode((CrossOperatorBase<?, ?, ?, 
?>) c);
-                       }
-                       else if (c instanceof BulkIterationBase) {
-                               n = new 
BulkIterationNode((BulkIterationBase<?>) c);
-                       }
-                       else if (c instanceof DeltaIterationBase) {
-                               n = new 
WorksetIterationNode((DeltaIterationBase<?, ?>) c);
-                       }
-                       else if (c instanceof Union){
-                               n = new BinaryUnionNode((Union<?>) c);
-                       }
-                       else if (c instanceof PartitionOperatorBase) {
-                               n = new 
PartitionNode((PartitionOperatorBase<?>) c);
-                       }
-                       else if (c instanceof SortPartitionOperatorBase) {
-                               n = new 
SortPartitionNode((SortPartitionOperatorBase<?>) c);
-                       }
-                       else if (c instanceof PartialSolutionPlaceHolder) {
-                               if (this.parent == null) {
-                                       throw new InvalidProgramException("It 
is currently not supported to create data sinks inside iterations.");
-                               }
-                               
-                               final PartialSolutionPlaceHolder<?> holder = 
(PartialSolutionPlaceHolder<?>) c;
-                               final BulkIterationBase<?> enclosingIteration = 
holder.getContainingBulkIteration();
-                               final BulkIterationNode containingIterationNode 
=
-                                                       (BulkIterationNode) 
this.parent.con2node.get(enclosingIteration);
-                               
-                               // catch this for the recursive translation of 
step functions
-                               BulkPartialSolutionNode p = new 
BulkPartialSolutionNode(holder, containingIterationNode);
-                               
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-                               n = p;
-                       }
-                       else if (c instanceof WorksetPlaceHolder) {
-                               if (this.parent == null) {
-                                       throw new InvalidProgramException("It 
is currently not supported to create data sinks inside iterations.");
-                               }
-                               
-                               final WorksetPlaceHolder<?> holder = 
(WorksetPlaceHolder<?>) c;
-                               final DeltaIterationBase<?, ?> 
enclosingIteration = holder.getContainingWorksetIteration();
-                               final WorksetIterationNode 
containingIterationNode =
-                                                       (WorksetIterationNode) 
this.parent.con2node.get(enclosingIteration);
-                               
-                               // catch this for the recursive translation of 
step functions
-                               WorksetNode p = new WorksetNode(holder, 
containingIterationNode);
-                               
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-                               n = p;
-                       }
-                       else if (c instanceof SolutionSetPlaceHolder) {
-                               if (this.parent == null) {
-                                       throw new InvalidProgramException("It 
is currently not supported to create data sinks inside iterations.");
-                               }
-                               
-                               final SolutionSetPlaceHolder<?> holder = 
(SolutionSetPlaceHolder<?>) c;
-                               final DeltaIterationBase<?, ?> 
enclosingIteration = holder.getContainingWorksetIteration();
-                               final WorksetIterationNode 
containingIterationNode =
-                                                       (WorksetIterationNode) 
this.parent.con2node.get(enclosingIteration);
-                               
-                               // catch this for the recursive translation of 
step functions
-                               SolutionSetNode p = new SolutionSetNode(holder, 
containingIterationNode);
-                               
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
-                               n = p;
-                       }
-                       else {
-                               throw new IllegalArgumentException("Unknown 
operator type: " + c);
-                       }
-
-                       this.con2node.put(c, n);
-                       
-                       // set the parallelism only if it has not been set 
before. some nodes have a fixed DOP, such as the
-                       // key-less reducer (all-reduce)
-                       if (n.getDegreeOfParallelism() < 1) {
-                               // set the degree of parallelism
-                               int par = c.getDegreeOfParallelism();
-                               if (par > 0) {
-                                       if (this.forceDOP && par != 
this.defaultParallelism) {
-                                               par = this.defaultParallelism;
-                                               LOG.warn("The parallelism of 
nested dataflows (such as step functions in iterations) is " +
-                                                       "currently fixed to the 
parallelism of the surrounding operator (the iteration).");
-                                       }
-                               } else {
-                                       par = this.defaultParallelism;
-                               }
-                               n.setDegreeOfParallelism(par);
-                       }
-
-                       return true;
-               }
-
-               @Override
-               public void postVisit(Operator<?> c) {
-                       
-                       OptimizerNode n = this.con2node.get(c);
-
-                       // first connect to the predecessors
-                       n.setInput(this.con2node, this.defaultDataExchangeMode);
-                       n.setBroadcastInputs(this.con2node, 
this.defaultDataExchangeMode);
-                       
-                       // if the node represents a bulk iteration, we 
recursively translate the data flow now
-                       if (n instanceof BulkIterationNode) {
-                               final BulkIterationNode iterNode = 
(BulkIterationNode) n;
-                               final BulkIterationBase<?> iter = 
iterNode.getIterationContract();
-
-                               // pass a copy of the no iterative part into 
the iteration translation,
-                               // in case the iteration references its closure
-                               HashMap<Operator<?>, OptimizerNode> closure = 
new HashMap<Operator<?>, OptimizerNode>(con2node);
-
-                               // first, recursively build the data flow for 
the step function
-                               final GraphCreatingVisitor recursiveCreator = 
new GraphCreatingVisitor(this, true,
-                                       iterNode.getDegreeOfParallelism(), 
defaultDataExchangeMode, closure);
-                               
-                               BulkPartialSolutionNode partialSolution;
-                               
-                               
iter.getNextPartialSolution().accept(recursiveCreator);
-                               
-                               partialSolution =  (BulkPartialSolutionNode) 
recursiveCreator.con2node.get(iter.getPartialSolution());
-                               OptimizerNode rootOfStepFunction = 
recursiveCreator.con2node.get(iter.getNextPartialSolution());
-                               if (partialSolution == null) {
-                                       throw new CompilerException("Error: The 
step functions result does not depend on the partial solution.");
-                               }
-                               
-                               
-                               OptimizerNode terminationCriterion = null;
-                               
-                               if (iter.getTerminationCriterion() != null) {
-                                       terminationCriterion = 
recursiveCreator.con2node.get(iter.getTerminationCriterion());
-                                       
-                                       // no intermediate node yet, traverse 
from the termination criterion to build the missing parts
-                                       if (terminationCriterion == null) {
-                                               
iter.getTerminationCriterion().accept(recursiveCreator);
-                                               terminationCriterion = 
recursiveCreator.con2node.get(iter.getTerminationCriterion());
-                                       }
-                               }
-                               
-                               iterNode.setPartialSolution(partialSolution);
-                               
iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
-                               
-                               // go over the contained data flow and mark the 
dynamic path nodes
-                               StaticDynamicPathIdentifier identifier = new 
StaticDynamicPathIdentifier(iterNode.getCostWeight());
-                               iterNode.acceptForStepFunction(identifier);
-                       }
-                       else if (n instanceof WorksetIterationNode) {
-                               final WorksetIterationNode iterNode = 
(WorksetIterationNode) n;
-                               final DeltaIterationBase<?, ?> iter = 
iterNode.getIterationContract();
-
-                               // we need to ensure that both the next-workset 
and the solution-set-delta depend on the workset.
-                               // One check is for free during the 
translation, we do the other check here as a pre-condition
-                               {
-                                       StepFunctionValidator wsf = new 
StepFunctionValidator();
-                                       iter.getNextWorkset().accept(wsf);
-                                       if (!wsf.foundWorkset) {
-                                               throw new CompilerException("In 
the given program, the next workset does not depend on the workset. " +
-                                                                               
                                        "This is a prerequisite in delta 
iterations.");
-                                       }
-                               }
-                               
-                               // calculate the closure of the anonymous 
function
-                               HashMap<Operator<?>, OptimizerNode> closure = 
new HashMap<Operator<?>, OptimizerNode>(con2node);
-
-                               // first, recursively build the data flow for 
the step function
-                               final GraphCreatingVisitor recursiveCreator = 
new GraphCreatingVisitor(
-                                               this, true, 
iterNode.getDegreeOfParallelism(), defaultDataExchangeMode, closure);
-                               
-                               // descend from the solution set delta. check 
that it depends on both the workset
-                               // and the solution set. If it does depend on 
both, this descend should create both nodes
-                               
iter.getSolutionSetDelta().accept(recursiveCreator);
-                               
-                               final WorksetNode worksetNode = (WorksetNode) 
recursiveCreator.con2node.get(iter.getWorkset());
-                               
-                               if (worksetNode == null) {
-                                       throw new CompilerException("In the 
given program, the solution set delta does not depend on the workset." +
-                                                                               
                                "This is a prerequisite in delta iterations.");
-                               }
-                               
-                               iter.getNextWorkset().accept(recursiveCreator);
-                               
-                               SolutionSetNode solutionSetNode = 
(SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
-                               
-                               if (solutionSetNode == null || 
solutionSetNode.getOutgoingConnections() == null || 
solutionSetNode.getOutgoingConnections().isEmpty()) {
-                                       solutionSetNode = new 
SolutionSetNode((SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode);
-                               }
-                               else {
-                                       for (PactConnection conn : 
solutionSetNode.getOutgoingConnections()) {
-                                               OptimizerNode successor = 
conn.getTarget();
-                                       
-                                               if (successor.getClass() == 
JoinNode.class) {
-                                                       // find out which input 
to the match the solution set is
-                                                       JoinNode mn = 
(JoinNode) successor;
-                                                       if 
(mn.getFirstPredecessorNode() == solutionSetNode) {
-                                                               
mn.makeJoinWithSolutionSet(0);
-                                                       } else if 
(mn.getSecondPredecessorNode() == solutionSetNode) {
-                                                               
mn.makeJoinWithSolutionSet(1);
-                                                       } else {
-                                                               throw new 
CompilerException();
-                                                       }
-                                               }
-                                               else if (successor.getClass() 
== CoGroupNode.class) {
-                                                       CoGroupNode cg = 
(CoGroupNode) successor;
-                                                       if 
(cg.getFirstPredecessorNode() == solutionSetNode) {
-                                                               
cg.makeCoGroupWithSolutionSet(0);
-                                                       } else if 
(cg.getSecondPredecessorNode() == solutionSetNode) {
-                                                               
cg.makeCoGroupWithSolutionSet(1);
-                                                       } else {
-                                                               throw new 
CompilerException();
-                                                       }
-                                               }
-                                               else {
-                                                       throw new 
InvalidProgramException(
-                                                                       "Error: 
The only operations allowed on the solution set are Join and CoGroup.");
-                                               }
-                                       }
-                               }
-                               
-                               final OptimizerNode nextWorksetNode = 
recursiveCreator.con2node.get(iter.getNextWorkset());
-                               final OptimizerNode solutionSetDeltaNode = 
recursiveCreator.con2node.get(iter.getSolutionSetDelta());
-                               
-                               // set the step function nodes to the iteration 
node
-                               iterNode.setPartialSolution(solutionSetNode, 
worksetNode);
-                               
iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, 
defaultDataExchangeMode);
-                               
-                               // go over the contained data flow and mark the 
dynamic path nodes
-                               StaticDynamicPathIdentifier pathIdentifier = 
new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-                               iterNode.acceptForStepFunction(pathIdentifier);
-                       }
-               }
-       }
-       
-       private static final class StaticDynamicPathIdentifier implements 
Visitor<OptimizerNode> {
-               
-               private final Set<OptimizerNode> seenBefore = new 
HashSet<OptimizerNode>();
-               
-               private final int costWeight;
-               
-               private StaticDynamicPathIdentifier(int costWeight) {
-                       this.costWeight = costWeight;
-               }
-               
-               @Override
-               public boolean preVisit(OptimizerNode visitable) {
-                       return this.seenBefore.add(visitable);
-               }
-
-               @Override
-               public void postVisit(OptimizerNode visitable) {
-                       visitable.identifyDynamicPath(this.costWeight);
-                       
-                       // check that there is no nested iteration on the 
dynamic path
-                       if (visitable.isOnDynamicPath() && visitable instanceof 
IterationNode) {
-                               throw new CompilerException("Nested iterations 
are currently not supported.");
-                       }
-               }
-       }
-       
-       /**
-        * Simple visitor that sets the minimal guaranteed memory per task 
based on the amount of available memory,
-        * the number of memory consumers, and on the task's degree of 
parallelism.
-        */
-       public static final class IdAndEstimatesVisitor implements 
Visitor<OptimizerNode> {
-               
-               private final DataStatistics statistics;
-
-               private int id = 1;
-               
-               public IdAndEstimatesVisitor(DataStatistics statistics) {
-                       this.statistics = statistics;
-               }
-
-               @Override
-               public boolean preVisit(OptimizerNode visitable) {
-                       return visitable.getId() == -1;
-               }
-
-               @Override
-               public void postVisit(OptimizerNode visitable) {
-                       // the node ids
-                       visitable.initId(this.id++);
-                       
-                       // connections need to figure out their maximum path 
depths
-                       for (PactConnection conn : 
visitable.getIncomingConnections()) {
-                               conn.initMaxDepth();
-                       }
-                       for (PactConnection conn : 
visitable.getBroadcastConnections()) {
-                               conn.initMaxDepth();
-                       }
-                       
-                       // the estimates
-                       visitable.computeOutputEstimates(this.statistics);
-                       
-                       // if required, recurse into the step function
-                       if (visitable instanceof IterationNode) {
-                               ((IterationNode) 
visitable).acceptForStepFunction(this);
-                       }
-               }
-       }
-       
-       /**
-        * Visitor that computes the interesting properties for each node in 
the plan. On its recursive
-        * depth-first descend, it propagates all interesting properties 
top-down.
-        */
-       public static final class InterestingPropertyVisitor implements 
Visitor<OptimizerNode> {
-               
-               private CostEstimator estimator; // the cost estimator for 
maximal costs of an interesting property
-
-               /**
-                * Creates a new visitor that computes the interesting 
properties for all nodes in the plan.
-                * It uses the given cost estimator used to compute the maximal 
costs for an interesting property.
-                * 
-                * @param estimator
-                *        The cost estimator to estimate the maximal costs for 
interesting properties.
-                */
-               public InterestingPropertyVisitor(CostEstimator estimator) {
-                       this.estimator = estimator;
-               }
-               
-               @Override
-               public boolean preVisit(OptimizerNode node) {
-                       // The interesting properties must be computed on the 
descend. In case a node has multiple outputs,
-                       // that computation must happen during the last descend.
-
-                       if (node.getInterestingProperties() == null && 
node.haveAllOutputConnectionInterestingProperties()) {
-                               
node.computeUnionOfInterestingPropertiesFromSuccessors();
-                               
node.computeInterestingPropertiesForInputs(this.estimator);
-                               return true;
-                       } else {
-                               return false;
-                       }
-               }
-
-               @Override
-               public void postVisit(OptimizerNode visitable) {}
-       }
-
-       /**
-        * On its re-ascend (post visit) this visitor, computes auxiliary maps 
that are needed to support plans
-        * that are not a minimally connected DAG (Such plans are not trees, 
but at least one node feeds its
-        * output into more than one other node).
-        */
-       public static final class BranchesVisitor implements 
Visitor<OptimizerNode> {
-               
-               @Override
-               public boolean preVisit(OptimizerNode node) {
-                       return node.getOpenBranches() == null;
-               }
-
-               @Override
-               public void postVisit(OptimizerNode node) {
-                       if (node instanceof IterationNode) {
-                               ((IterationNode) 
node).acceptForStepFunction(this);
-                       }
-
-                       node.computeUnclosedBranchStack();
-               }
-       }
-       
-       /**
-        * Finalization of the plan:
-        *  - The graph of nodes is double-linked (links from child to parent 
are inserted)
-        *  - If unions join static and dynamic paths, the cache is marked as a 
memory consumer
-        *  - Relative memory fractions are assigned to all nodes.
-        *  - All nodes are collected into a set.
-        */
-       private static final class PlanFinalizer implements Visitor<PlanNode> {
-               
-               private final Set<PlanNode> allNodes; // a set of all nodes in 
the optimizer plan
-
-               private final List<SourcePlanNode> sources; // all data source 
nodes in the optimizer plan
-
-               private final List<SinkPlanNode> sinks; // all data sink nodes 
in the optimizer plan
-               
-               private final Deque<IterationPlanNode> stackOfIterationNodes;
-
-               private int memoryConsumerWeights; // a counter of all memory 
consumers
-
-               /**
-                * Creates a new plan finalizer.
-                */
-               private PlanFinalizer() {
-                       this.allNodes = new HashSet<PlanNode>();
-                       this.sources = new ArrayList<SourcePlanNode>();
-                       this.sinks = new ArrayList<SinkPlanNode>();
-                       this.stackOfIterationNodes = new 
ArrayDeque<IterationPlanNode>();
-               }
-
-               private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, 
String jobName, Plan originalPlan) {
-                       this.memoryConsumerWeights = 0;
-                       
-                       // traverse the graph
-                       for (SinkPlanNode node : sinks) {
-                               node.accept(this);
-                       }
-
-                       // assign the memory to each node
-                       if (this.memoryConsumerWeights > 0) {
-                               for (PlanNode node : this.allNodes) {
-                                       // assign memory to the driver strategy 
of the node
-                                       final int consumerWeight = 
node.getMemoryConsumerWeight();
-                                       if (consumerWeight > 0) {
-                                               final double relativeMem = 
(double)consumerWeight / this.memoryConsumerWeights;
-                                               
node.setRelativeMemoryPerSubtask(relativeMem);
-                                               if (LOG.isDebugEnabled()) {
-                                                       LOG.debug("Assigned " + 
relativeMem + " of total memory to each subtask of " +
-                                                               
node.getPactContract().getName() + ".");
-                                               }
-                                       }
-                                       
-                                       // assign memory to the local and 
global strategies of the channels
-                                       for (Channel c : node.getInputs()) {
-                                               if 
(c.getLocalStrategy().dams()) {
-                                                       final double 
relativeMem = 1.0 / this.memoryConsumerWeights;
-                                                       
c.setRelativeMemoryLocalStrategy(relativeMem);
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy 
" +
-                                                                               
"instance of " + c + ".");
-                                                       }
-                                               }
-                                               if (c.getTempMode() != 
TempMode.NONE) {
-                                                       final double 
relativeMem = 1.0/ this.memoryConsumerWeights;
-                                                       
c.setRelativeTempMemory(relativeMem);
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the 
temp " +
-                                                                               
"table for " + c + ".");
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-                       return new OptimizedPlan(this.sources, this.sinks, 
this.allNodes, jobName, originalPlan);
-               }
-
-               @Override
-               public boolean preVisit(PlanNode visitable) {
-                       // if we come here again, prevent a further descend
-                       if (!this.allNodes.add(visitable)) {
-                               return false;
-                       }
-                       
-                       if (visitable instanceof SinkPlanNode) {
-                               this.sinks.add((SinkPlanNode) visitable);
-                       }
-                       else if (visitable instanceof SourcePlanNode) {
-                               this.sources.add((SourcePlanNode) visitable);
-                       }
-                       else if (visitable instanceof BinaryUnionPlanNode) {
-                               BinaryUnionPlanNode unionNode = 
(BinaryUnionPlanNode) visitable;
-                               if (unionNode.unionsStaticAndDynamicPath()) {
-                                       
unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
-                               }
-                       }
-                       else if (visitable instanceof 
BulkPartialSolutionPlanNode) {
-                               // tell the partial solution about the 
iteration node that contains it
-                               final BulkPartialSolutionPlanNode pspn = 
(BulkPartialSolutionPlanNode) visitable;
-                               final IterationPlanNode iteration = 
this.stackOfIterationNodes.peekLast();
-                               
-                               // sanity check!
-                               if (iteration == null || !(iteration instanceof 
BulkIterationPlanNode)) {
-                                       throw new CompilerException("Bug: Error 
finalizing the plan. " +
-                                                       "Cannot associate the 
node for a partial solutions with its containing iteration.");
-                               }
-                               
pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
-                       }
-                       else if (visitable instanceof WorksetPlanNode) {
-                               // tell the partial solution about the 
iteration node that contains it
-                               final WorksetPlanNode wspn = (WorksetPlanNode) 
visitable;
-                               final IterationPlanNode iteration = 
this.stackOfIterationNodes.peekLast();
-                               
-                               // sanity check!
-                               if (iteration == null || !(iteration instanceof 
WorksetIterationPlanNode)) {
-                                       throw new CompilerException("Bug: Error 
finalizing the plan. " +
-                                                       "Cannot associate the 
node for a partial solutions with its containing iteration.");
-                               }
-                               
wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-                       }
-                       else if (visitable instanceof SolutionSetPlanNode) {
-                               // tell the partial solution about the 
iteration node that contains it
-                               final SolutionSetPlanNode sspn = 
(SolutionSetPlanNode) visitable;
-                               final IterationPlanNode iteration = 
this.stackOfIterationNodes.peekLast();
-                               
-                               // sanity check!
-                               if (iteration == null || !(iteration instanceof 
WorksetIterationPlanNode)) {
-                                       throw new CompilerException("Bug: Error 
finalizing the plan. " +
-                                                       "Cannot associate the 
node for a partial solutions with its containing iteration.");
-                               }
-                               
sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
-                       }
-                       
-                       // double-connect the connections. previously, only 
parents knew their children, because
-                       // one child candidate could have been referenced by 
multiple parents.
-                       for (Channel conn : visitable.getInputs()) {
-                               conn.setTarget(visitable);
-                               conn.getSource().addOutgoingChannel(conn);
-                       }
-                       
-                       for (Channel c : visitable.getBroadcastInputs()) {
-                               c.setTarget(visitable);
-                               c.getSource().addOutgoingChannel(c);
-                       }
-
-                       // count the memory consumption
-                       this.memoryConsumerWeights += 
visitable.getMemoryConsumerWeight();
-                       for (Channel c : visitable.getInputs()) {
-                               if (c.getLocalStrategy().dams()) {
-                                       this.memoryConsumerWeights++;
-                               }
-                               if (c.getTempMode() != TempMode.NONE) {
-                                       this.memoryConsumerWeights++;
-                               }
-                       }
-                       for (Channel c : visitable.getBroadcastInputs()) {
-                               if (c.getLocalStrategy().dams()) {
-                                       this.memoryConsumerWeights++;
-                               }
-                               if (c.getTempMode() != TempMode.NONE) {
-                                       this.memoryConsumerWeights++;
-                               }
-                       }
-                       
-                       // pass the visitor to the iteraton's step function
-                       if (visitable instanceof IterationPlanNode) {
-                               // push the iteration node onto the stack
-                               final IterationPlanNode iterNode = 
(IterationPlanNode) visitable;
-                               this.stackOfIterationNodes.addLast(iterNode);
-                               
-                               // recurse
-                               ((IterationPlanNode) 
visitable).acceptForStepFunction(this);
-                               
-                               // pop the iteration node from the stack
-                               this.stackOfIterationNodes.removeLast();
-                       }
-                       return true;
-               }
-
-               @Override
-               public void postVisit(PlanNode visitable) {}
-       }
-       
-       /**
-        * A visitor that traverses the graph and collects cascading binary 
unions into a single n-ary
-        * union operator. The exception is, when on of the union inputs is 
materialized, such as in the
-        * static-code-path-cache in iterations.
-        */
-       private static final class BinaryUnionReplacer implements 
Visitor<PlanNode> {
-               
-               private final Set<PlanNode> seenBefore = new 
HashSet<PlanNode>();
-
-               @Override
-               public boolean preVisit(PlanNode visitable) {
-                       if (this.seenBefore.add(visitable)) {
-                               if (visitable instanceof IterationPlanNode) {
-                                       ((IterationPlanNode) 
visitable).acceptForStepFunction(this);
-                               }
-                               return true;
-                       } else {
-                               return false;
-                       }
-               }
-
-               @Override
-               public void postVisit(PlanNode visitable) {
-                       
-                       if (visitable instanceof BinaryUnionPlanNode) {
-                               
-                               final BinaryUnionPlanNode unionNode = 
(BinaryUnionPlanNode) visitable;
-                               final Channel in1 = unionNode.getInput1();
-                               final Channel in2 = unionNode.getInput2();
-                       
-                               if (!unionNode.unionsStaticAndDynamicPath()) {
-                                       
-                                       // both on static path, or both on 
dynamic path. we can collapse them
-                                       NAryUnionPlanNode newUnionNode;
-
-                                       List<Channel> inputs = new 
ArrayList<Channel>();
-                                       collect(in1, inputs);
-                                       collect(in2, inputs);
-
-                                       newUnionNode = new 
NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, 
-                                                       
unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
-                                       
-                                       
newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism());
-
-                                       for (Channel c : inputs) {
-                                               c.setTarget(newUnionNode);
-                                       }
-
-                                       for (Channel channel : 
unionNode.getOutgoingChannels()) {
-                                               
channel.swapUnionNodes(newUnionNode);
-                                               
newUnionNode.addOutgoingChannel(channel);
-                                       }
-                               }
-                               else {
-                                       // union between the static and the 
dynamic path. we need to handle this for now
-                                       // through a special union operator
-                                       
-                                       // make sure that the first input is 
the cached (static) and the second input is the dynamic
-                                       if (in1.isOnDynamicPath()) {
-                                               BinaryUnionPlanNode 
newUnionNode = new BinaryUnionPlanNode(unionNode);
-                                               
-                                               in1.setTarget(newUnionNode);
-                                               in2.setTarget(newUnionNode);
-                                               
-                                               for (Channel channel : 
unionNode.getOutgoingChannels()) {
-                                                       
channel.swapUnionNodes(newUnionNode);
-                                                       
newUnionNode.addOutgoingChannel(channel);
-                                               }
-                                       }
-                               }
-                       }
-               }
-               
-               private void collect(Channel in, List<Channel> inputs) {
-                       if (in.getSource() instanceof NAryUnionPlanNode) {
-                               // sanity check
-                               if (in.getShipStrategy() != 
ShipStrategyType.FORWARD) {
-                                       throw new CompilerException("Bug: Plan 
generation for Unions picked a ship strategy between binary plan operators.");
-                               }
-                               if (!(in.getLocalStrategy() == null || 
in.getLocalStrategy() == LocalStrategy.NONE)) {
-                                       throw new CompilerException("Bug: Plan 
generation for Unions picked a local strategy between binary plan operators.");
-                               }
-                               
-                               inputs.addAll(((NAryUnionPlanNode) 
in.getSource()).getListOfInputs());
-                       } else {
-                               // is not a collapsed union node, so we take 
the channel directly
-                               inputs.add(in);
-                       }
-               }
-       }
-       
-       private static final class StepFunctionValidator implements 
Visitor<Operator<?>> {
-
-               private final Set<Operator<?>> seenBefore = new 
HashSet<Operator<?>>();
-               
-               private boolean foundWorkset;
-               
-               @Override
-               public boolean preVisit(Operator<?> visitable) {
-                       if (visitable instanceof WorksetPlaceHolder) {
-                               foundWorkset = true;
-                       }
-                       
-                       return (!foundWorkset) && seenBefore.add(visitable);
-               }
-
-               @Override
-               public void postVisit(Operator<?> visitable) {}
-       }
-
-       // 
------------------------------------------------------------------------
-       // Miscellaneous
-       // 
------------------------------------------------------------------------
-       
-       private OptimizerPostPass getPostPassFromPlan(Plan program) {
-               final String className =  program.getPostPassClassName();
-               if (className == null) {
-                       throw new CompilerException("Optimizer Post Pass class 
description is null");
-               }
-               try {
-                       Class<? extends OptimizerPostPass> clazz = 
Class.forName(className).asSubclass(OptimizerPostPass.class);
-                       try {
-                               return InstantiationUtil.instantiate(clazz, 
OptimizerPostPass.class);
-                       } catch (RuntimeException rtex) {
-                               // unwrap the source exception
-                               if (rtex.getCause() != null) {
-                                       throw new CompilerException("Cannot 
instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
-                               } else {
-                                       throw rtex;
-                               }
-                       }
-               } catch (ClassNotFoundException cnfex) {
-                       throw new CompilerException("Cannot load Optimizer 
post-pass class '" + className + "'.", cnfex);
-               } catch (ClassCastException ccex) {
-                       throw new CompilerException("Class '" + className + "' 
is not an optimizer post passer.", ccex);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
index f8404c4..d199ae7 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
 import java.util.Collections;
@@ -62,8 +61,8 @@ public abstract class AbstractPartialSolutionNode extends 
OptimizerNode {
        }
 
        @Override
-       public List<PactConnection> getIncomingConnections() {
-               return Collections.<PactConnection>emptyList();
+       public List<DagConnection> getIncomingConnections() {
+               return Collections.emptyList();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 8aed10c..068799e 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -107,7 +107,7 @@ public class BinaryUnionNode extends TwoInputNode {
                final List<? extends PlanNode> subPlans1 = 
getFirstPredecessorNode().getAlternativePlans(estimator);
                final List<? extends PlanNode> subPlans2 = 
getSecondPredecessorNode().getAlternativePlans(estimator);
 
-               List<PactConnection> broadcastConnections = 
getBroadcastConnections();
+               List<DagConnection> broadcastConnections = 
getBroadcastConnections();
                if (broadcastConnections != null && broadcastConnections.size() 
> 0) {
                        throw new CompilerException("Found BroadcastVariables 
on a Union operation");
                }
@@ -122,9 +122,9 @@ public class BinaryUnionNode extends TwoInputNode {
                final ExecutionMode input1Mode = 
this.input1.getDataExchangeMode();
                final ExecutionMode input2Mode = 
this.input2.getDataExchangeMode();
 
-               final int dop = getDegreeOfParallelism();
-               final int inDop1 = 
getFirstPredecessorNode().getDegreeOfParallelism();
-               final int inDop2 = 
getSecondPredecessorNode().getDegreeOfParallelism();
+               final int dop = getParallelism();
+               final int inDop1 = getFirstPredecessorNode().getParallelism();
+               final int inDop2 = getSecondPredecessorNode().getParallelism();
 
                final boolean dopChange1 = dop != inDop1;
                final boolean dopChange2 = dop != inDop2;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 8112748..c55d17a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -62,9 +62,9 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
        
        private OptimizerNode nextPartialSolution;
        
-       private PactConnection rootConnection;          // connection out of 
the next partial solution
+       private DagConnection rootConnection;           // connection out of 
the next partial solution
        
-       private PactConnection terminationCriterionRootConnection;      // 
connection out of the term. criterion
+       private DagConnection terminationCriterionRootConnection;       // 
connection out of the term. criterion
        
        private OptimizerNode singleRoot;
        
@@ -93,7 +93,7 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
        // 
--------------------------------------------------------------------------------------------
        
        public BulkIterationBase<?> getIterationContract() {
-               return (BulkIterationBase<?>) getPactContract();
+               return (BulkIterationBase<?>) getOperator();
        }
        
        /**
@@ -133,14 +133,14 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
                
                // check if the root of the step function has the same DOP as 
the iteration
                // or if the step function has any operator at all
-               if (nextPartialSolution.getDegreeOfParallelism() != 
getDegreeOfParallelism() ||
+               if (nextPartialSolution.getParallelism() != getParallelism() ||
                        nextPartialSolution == partialSolution || 
nextPartialSolution instanceof BinaryUnionNode)
                {
                        // add a no-op to the root to express the 
re-partitioning
                        NoOpNode noop = new NoOpNode();
-                       noop.setDegreeOfParallelism(getDegreeOfParallelism());
+                       noop.setDegreeOfParallelism(getParallelism());
 
-                       PactConnection noOpConn = new 
PactConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
+                       DagConnection noOpConn = new 
DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
                        noop.setIncomingConnection(noOpConn);
                        nextPartialSolution.addOutgoingConnection(noOpConn);
                        
@@ -152,13 +152,13 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
                
                if (terminationCriterion == null) {
                        this.singleRoot = nextPartialSolution;
-                       this.rootConnection = new 
PactConnection(nextPartialSolution, ExecutionMode.PIPELINED);
+                       this.rootConnection = new 
DagConnection(nextPartialSolution, ExecutionMode.PIPELINED);
                }
                else {
                        // we have a termination criterion
                        SingleRootJoiner singleRootJoiner = new 
SingleRootJoiner();
-                       this.rootConnection = new 
PactConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
-                       this.terminationCriterionRootConnection = new 
PactConnection(terminationCriterion, singleRootJoiner,
+                       this.rootConnection = new 
DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
+                       this.terminationCriterionRootConnection = new 
DagConnection(terminationCriterion, singleRootJoiner,
                                                                                
                                                                
ExecutionMode.PIPELINED);
 
                        singleRootJoiner.setInputs(this.rootConnection, 
this.terminationCriterionRootConnection);
@@ -323,7 +323,7 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
                                        locPropsReq.parameterizeChannel(toNoOp);
                                        
                                        UnaryOperatorNode rebuildPropertiesNode 
= new UnaryOperatorNode("Rebuild Partial Solution Properties", 
FieldList.EMPTY_LIST);
-                                       
rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+                                       
rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
                                        
                                        SingleInputPlanNode 
rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, 
"Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
                                        
rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), 
toNoOp.getLocalProperties());
@@ -352,7 +352,7 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
                // 5) Create a candidate for the Iteration Node for every 
remaining plan of the step function.
                if (terminationCriterion == null) {
                        for (PlanNode candidate : candidates) {
-                               BulkIterationPlanNode node = new 
BulkIterationPlanNode(this, "BulkIteration 
("+this.getPactContract().getName()+")", in, pspn, candidate);
+                               BulkIterationPlanNode node = new 
BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", 
in, pspn, candidate);
                                GlobalProperties gProps = 
candidate.getGlobalProperties().clone();
                                LocalProperties lProps = 
candidate.getLocalProperties().clone();
                                node.initProperties(gProps, lProps);
@@ -367,7 +367,7 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
                        for (PlanNode candidate : candidates) {
                                for (PlanNode terminationCandidate : 
terminationCriterionCandidates) {
                                        if 
(singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
-                                               BulkIterationPlanNode node = 
new BulkIterationPlanNode(this, "BulkIteration 
("+this.getPactContract().getName()+")", in, pspn, candidate, 
terminationCandidate);
+                                               BulkIterationPlanNode node = 
new BulkIterationPlanNode(this, "BulkIteration 
("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate);
                                                GlobalProperties gProps = 
candidate.getGlobalProperties().clone();
                                                LocalProperties lProps = 
candidate.getLocalProperties().clone();
                                                node.initProperties(gProps, 
lProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
index a6e03ff..25a7eef 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
 import java.util.Collections;
@@ -49,7 +48,8 @@ public class BulkPartialSolutionNode extends 
AbstractPartialSolutionNode {
                if (this.cachedPlans != null) {
                        throw new IllegalStateException();
                } else {
-                       this.cachedPlans = 
Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this, 
"PartialSolution ("+this.getPactContract().getName()+")", gProps, lProps, 
initialInput));
+                       this.cachedPlans = 
Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this,
+                                       "PartialSolution 
("+this.getOperator().getName()+")", gProps, lProps, initialInput));
                }
        }
        
@@ -73,13 +73,14 @@ public class BulkPartialSolutionNode extends 
AbstractPartialSolutionNode {
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Gets the contract object for this data source node.
+        * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that 
is represented by this
+        * optimizer node.
         * 
-        * @return The contract.
+        * @return The operator represented by this optimizer node.
         */
        @Override
-       public PartialSolutionPlaceHolder<?> getPactContract() {
-               return (PartialSolutionPlaceHolder<?>) super.getPactContract();
+       public PartialSolutionPlaceHolder<?> getOperator() {
+               return (PartialSolutionPlaceHolder<?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 7c0cc9a..92076c3 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -50,8 +50,8 @@ public class CoGroupNode extends TwoInputNode {
         * @return The CoGroup operator.
         */
        @Override
-       public CoGroupOperatorBase<?, ?, ?, ?> getPactContract() {
-               return (CoGroupOperatorBase<?, ?, ?, ?>) 
super.getPactContract();
+       public CoGroupOperatorBase<?, ?, ?, ?> getOperator() {
+               return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator();
        }
 
        @Override
@@ -85,7 +85,7 @@ public class CoGroupNode extends TwoInputNode {
                Ordering groupOrder1 = null;
                Ordering groupOrder2 = null;
                
-               CoGroupOperatorBase<?, ?, ?, ?> cgc = getPactContract();
+               CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator();
                groupOrder1 = cgc.getGroupOrderForInputOne();
                groupOrder2 = cgc.getGroupOrderForInputTwo();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
index afeed1d..8de67e8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor;
 import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor;
 import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor;
@@ -50,7 +50,7 @@ public class CrossNode extends TwoInputNode {
                super(operation);
                
                Configuration conf = operation.getParameters();
-               String localStrategy = 
conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+               String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
        
                CrossHint hint = operation.getCrossHint();
                
@@ -60,13 +60,13 @@ public class CrossNode extends TwoInputNode {
                        final boolean allowBCsecond = hint != 
CrossHint.FIRST_IS_SMALL;
                        
                        final OperatorDescriptorDual fixedDriverStrat;
-                       if 
(PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy))
 {
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy))
 {
                                fixedDriverStrat = new 
CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
-                       } else if 
(PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy))
 {
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy))
 {
                                fixedDriverStrat = new 
CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond);
-                       } else if 
(PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy))
 {
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy))
 {
                                fixedDriverStrat = new 
CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond);
-                       } else if 
(PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy))
 {
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy))
 {
                                fixedDriverStrat = new 
CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond);
                        } else {
                                throw new CompilerException("Invalid local 
strategy hint for cross contract: " + localStrategy);
@@ -99,8 +99,8 @@ public class CrossNode extends TwoInputNode {
        // 
------------------------------------------------------------------------
 
        @Override
-       public CrossOperatorBase<?, ?, ?, ?> getPactContract() {
-               return (CrossOperatorBase<?, ?, ?, ?>) super.getPactContract();
+       public CrossOperatorBase<?, ?, ?, ?> getOperator() {
+               return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
new file mode 100644
index 0000000..360f579
--- /dev/null
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined).
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
+ */
+public class DagConnection implements EstimateProvider, 
DumpableConnection<OptimizerNode> {
+       
+       private final OptimizerNode source; // The source node of the connection
+
+       private final OptimizerNode target; // The target node of the 
connection.
+
+       private final ExecutionMode dataExchangeMode; // defines whether to use 
batch or pipelined data exchange
+
+       private InterestingProperties interestingProps; // local properties 
that succeeding nodes are interested in
+
+       private ShipStrategyType shipStrategy; // The data shipping strategy, 
if predefined.
+       
+       private TempMode materializationMode = TempMode.NONE; // the 
materialization mode
+       
+       private int maxDepth = -1;
+
+       private boolean breakPipeline;  // whether this connection should break 
the pipeline due to potential deadlocks
+
+       /**
+        * Creates a new Connection between two nodes. The shipping strategy is 
by default <tt>NONE</tt>.
+        * The temp mode is by default <tt>NONE</tt>.
+        * 
+        * @param source
+        *        The source node.
+        * @param target
+        *        The target node.
+        */
+       public DagConnection(OptimizerNode source, OptimizerNode target, 
ExecutionMode exchangeMode) {
+               this(source, target, null, exchangeMode);
+       }
+
+       /**
+        * Creates a new Connection between two nodes.
+        * 
+        * @param source
+        *        The source node.
+        * @param target
+        *        The target node.
+        * @param shipStrategy
+        *        The shipping strategy.
+        * @param exchangeMode
+        *        The data exchange mode (pipelined / batch / batch only for 
shuffles / ... )
+        */
+       public DagConnection(OptimizerNode source, OptimizerNode target,
+                                                ShipStrategyType shipStrategy, 
ExecutionMode exchangeMode)
+       {
+               if (source == null || target == null) {
+                       throw new NullPointerException("Source and target must 
not be null.");
+               }
+               this.source = source;
+               this.target = target;
+               this.shipStrategy = shipStrategy;
+               this.dataExchangeMode = exchangeMode;
+       }
+       
+       /**
+        * Constructor to create a result from an operator that is not
+        * consumed by another operator.
+        * 
+        * @param source
+        *        The source node.
+        */
+       public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) {
+               if (source == null) {
+                       throw new NullPointerException("Source and target must 
not be null.");
+               }
+               this.source = source;
+               this.target = null;
+               this.shipStrategy = ShipStrategyType.NONE;
+               this.dataExchangeMode = exchangeMode;
+       }
+
+       /**
+        * Gets the source of the connection.
+        * 
+        * @return The source Node.
+        */
+       public OptimizerNode getSource() {
+               return this.source;
+       }
+
+       /**
+        * Gets the target of the connection.
+        * 
+        * @return The target node.
+        */
+       public OptimizerNode getTarget() {
+               return this.target;
+       }
+
+       /**
+        * Gets the shipping strategy for this connection.
+        * 
+        * @return The connection's shipping strategy.
+        */
+       public ShipStrategyType getShipStrategy() {
+               return this.shipStrategy;
+       }
+
+       /**
+        * Sets the shipping strategy for this connection.
+        * 
+        * @param strategy
+        *        The shipping strategy to be applied to this connection.
+        */
+       public void setShipStrategy(ShipStrategyType strategy) {
+               this.shipStrategy = strategy;
+       }
+
+       /**
+        * Gets the data exchange mode to use for this connection.
+        *
+        * @return The data exchange mode to use for this connection.
+        */
+       public ExecutionMode getDataExchangeMode() {
+               if (dataExchangeMode == null) {
+                       throw new IllegalStateException("This connection does 
not have the data exchange mode set");
+               }
+               return dataExchangeMode;
+       }
+
+       /**
+        * Marks that this connection should do a decoupled data exchange (such 
as batched)
+        * rather then pipeline data. Connections are marked as pipeline 
breakers to avoid
+        * deadlock situations.
+        */
+       public void markBreaksPipeline() {
+               this.breakPipeline = true;
+       }
+
+       /**
+        * Checks whether this connection is marked to break the pipeline.
+        *
+        * @return True, if this connection is marked to break the pipeline, 
false otherwise.
+        */
+       public boolean isBreakingPipeline() {
+               return this.breakPipeline;
+       }
+
+       /**
+        * Gets the interesting properties object for this pact connection.
+        * If the interesting properties for this connections have not yet been 
set,
+        * this method returns null.
+        * 
+        * @return The collection of all interesting properties, or null, if 
they have not yet been set.
+        */
+       public InterestingProperties getInterestingProperties() {
+               return this.interestingProps;
+       }
+
+       /**
+        * Sets the interesting properties for this pact connection.
+        * 
+        * @param props The interesting properties.
+        */
+       public void setInterestingProperties(InterestingProperties props) {
+               if (this.interestingProps == null) {
+                       this.interestingProps = props;
+               } else {
+                       throw new IllegalStateException("Interesting Properties 
have already been set.");
+               }
+       }
+       
+       public void clearInterestingProperties() {
+               this.interestingProps = null;
+       }
+       
+       public void initMaxDepth() {
+               
+               if (this.maxDepth == -1) {
+                       this.maxDepth = this.source.getMaxDepth() + 1;
+               } else {
+                       throw new IllegalStateException("Maximum path depth has 
already been initialized.");
+               }
+       }
+       
+       public int getMaxDepth() {
+               if (this.maxDepth != -1) {
+                       return this.maxDepth;
+               } else {
+                       throw new IllegalStateException("Maximum path depth has 
not been initialized.");
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimates
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public long getEstimatedOutputSize() {
+               return this.source.getEstimatedOutputSize();
+       }
+
+       @Override
+       public long getEstimatedNumRecords() {
+               return this.source.getEstimatedNumRecords();
+       }
+       
+       @Override
+       public float getEstimatedAvgWidthPerOutputRecord() {
+               return this.source.getEstimatedAvgWidthPerOutputRecord();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       
+       public TempMode getMaterializationMode() {
+               return this.materializationMode;
+       }
+       
+       public void setMaterializationMode(TempMode materializationMode) {
+               this.materializationMode = materializationMode;
+       }
+       
+       public boolean isOnDynamicPath() {
+               return this.source.isOnDynamicPath();
+       }
+       
+       public int getCostWeight() {
+               return this.source.getCostWeight();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       public String toString() {
+               StringBuilder buf = new StringBuilder(50);
+               buf.append("Connection: ");
+
+               if (this.source == null) {
+                       buf.append("null");
+               } else {
+                       buf.append(this.source.getOperator().getName());
+                       
buf.append('(').append(this.source.getName()).append(')');
+               }
+
+               buf.append(" -> ");
+
+               if (this.shipStrategy != null) {
+                       buf.append('[');
+                       buf.append(this.shipStrategy.name());
+                       buf.append(']').append(' ');
+               }
+
+               if (this.target == null) {
+                       buf.append("null");
+               } else {
+                       buf.append(this.target.getOperator().getName());
+                       
buf.append('(').append(this.target.getName()).append(')');
+               }
+
+               return buf.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index 9e4f457..dbe04f4 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -46,7 +46,7 @@ import org.apache.flink.util.Visitor;
  */
 public class DataSinkNode extends OptimizerNode {
        
-       protected PactConnection input;                 // The input edge
+       protected DagConnection input;                  // The input edge
        
        /**
         * Creates a new DataSinkNode for the given sink operator.
@@ -64,7 +64,7 @@ public class DataSinkNode extends OptimizerNode {
         * 
         * @return The input connection.
         */
-       public PactConnection getInputConnection() {
+       public DagConnection getInputConnection() {
                return this.input;
        }
        
@@ -87,8 +87,8 @@ public class DataSinkNode extends OptimizerNode {
         * @return The node's underlying operator.
         */
        @Override
-       public GenericDataSinkBase<?> getPactContract() {
-               return (GenericDataSinkBase<?>) super.getPactContract();
+       public GenericDataSinkBase<?> getOperator() {
+               return (GenericDataSinkBase<?>) super.getOperator();
        }
 
        @Override
@@ -97,7 +97,7 @@ public class DataSinkNode extends OptimizerNode {
        }
 
        @Override
-       public List<PactConnection> getIncomingConnections() {
+       public List<DagConnection> getIncomingConnections() {
                return Collections.singletonList(this.input);
        }
 
@@ -107,19 +107,19 @@ public class DataSinkNode extends OptimizerNode {
         * @return An empty list.
         */
        @Override
-       public List<PactConnection> getOutgoingConnections() {
+       public List<DagConnection> getOutgoingConnections() {
                return Collections.emptyList();
        }
 
        @Override
        public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultExchangeMode) {
-               Operator<?> children = getPactContract().getInput();
+               Operator<?> children = getOperator().getInput();
 
                final OptimizerNode pred;
-               final PactConnection conn;
+               final DagConnection conn;
                
                pred = contractToNode.get(children);
-               conn = new PactConnection(pred, this, defaultExchangeMode);
+               conn = new DagConnection(pred, this, defaultExchangeMode);
                        
                // create the connection and add it
                this.input = conn;
@@ -141,8 +141,8 @@ public class DataSinkNode extends OptimizerNode {
                final InterestingProperties iProps = new 
InterestingProperties();
                
                {
-                       final Ordering partitioning = 
getPactContract().getPartitionOrdering();
-                       final DataDistribution dataDist = 
getPactContract().getDataDistribution();
+                       final Ordering partitioning = 
getOperator().getPartitionOrdering();
+                       final DataDistribution dataDist = 
getOperator().getDataDistribution();
                        final RequestedGlobalProperties partitioningProps = new 
RequestedGlobalProperties();
                        if (partitioning != null) {
                                if(dataDist != null) {
@@ -156,7 +156,7 @@ public class DataSinkNode extends OptimizerNode {
                }
                
                {
-                       final Ordering localOrder = 
getPactContract().getLocalOrder();
+                       final Ordering localOrder = 
getOperator().getLocalOrder();
                        final RequestedLocalProperties orderProps = new 
RequestedLocalProperties();
                        if (localOrder != null) {
                                orderProps.setOrdering(localOrder);
@@ -184,7 +184,7 @@ public class DataSinkNode extends OptimizerNode {
        }
        
        @Override
-       protected List<UnclosedBranchDescriptor> 
getBranchesForParent(PactConnection parent) {
+       protected List<UnclosedBranchDescriptor> 
getBranchesForParent(DagConnection parent) {
                // return our own stack of open branches, because nothing is 
added
                return this.openBranches;
        }
@@ -204,8 +204,8 @@ public class DataSinkNode extends OptimizerNode {
                List<? extends PlanNode> subPlans = 
getPredecessorNode().getAlternativePlans(estimator);
                List<PlanNode> outputPlans = new ArrayList<PlanNode>();
                
-               final int dop = getDegreeOfParallelism();
-               final int inDop = getPredecessorNode().getDegreeOfParallelism();
+               final int dop = getParallelism();
+               final int inDop = getPredecessorNode().getParallelism();
 
                final ExecutionMode executionMode = 
this.input.getDataExchangeMode();
                final boolean dopChange = dop != inDop;
@@ -224,7 +224,7 @@ public class DataSinkNode extends OptimizerNode {
                                        // no need to check whether the created 
properties meet what we need in case
                                        // of ordering or global ordering, 
because the only interesting properties we have
                                        // are what we require
-                                       outputPlans.add(new SinkPlanNode(this, 
"DataSink ("+this.getPactContract().getName()+")" ,c));
+                                       outputPlans.add(new SinkPlanNode(this, 
"DataSink ("+this.getOperator().getName()+")" ,c));
                                }
                        }
                }

Reply via email to