http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
deleted file mode 100644
index 6f634fb..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ /dev/null
@@ -1,573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.optimizer.plandump.DumpableNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.util.Visitable;
-
-/**
- * The representation of a data exchange between to operators. The data 
exchange can realize a shipping strategy, 
- * which established global properties, and a local strategy, which 
establishes local properties.
- * <p>
- * Because we currently deal only with plans where the operator order is 
fixed, many properties are equal
- * among candidates and are determined prior to the enumeration (such as for 
example constant/dynamic path membership).
- * Hence, many methods will delegate to the {@code OptimizerNode} that 
represents the node this candidate was
- * created for.
- */
-public abstract class PlanNode implements Visitable<PlanNode>, 
DumpableNode<PlanNode> {
-       
-       protected final OptimizerNode template;
-       
-       protected final List<Channel> outChannels;
-       
-       private List<NamedChannel> broadcastInputs;
-       
-       private final String nodeName; 
-       
-       private DriverStrategy driverStrategy;  // The local strategy (sorting 
/ hashing, ...)
-       
-       protected LocalProperties localProps;                   // local 
properties of the data produced by this node
-
-       protected GlobalProperties globalProps;                 // global 
properties of the data produced by this node
-       
-       protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan 
alternative chosen at a branch point
-       
-       protected Costs nodeCosts;                                              
// the costs incurred by this node
-
-       protected Costs cumulativeCosts;                                        
// the cumulative costs of all operators in the sub-tree
-       
-       private double relativeMemoryPerSubTask;                                
        // the amount of memory dedicated to each task, in bytes
-       
-       private int parallelism;
-       
-       private boolean pFlag;                                                  
// flag for the internal pruning algorithm
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public PlanNode(OptimizerNode template, String nodeName, DriverStrategy 
strategy) {
-               this.outChannels = new ArrayList<Channel>(2);
-               this.broadcastInputs = new ArrayList<NamedChannel>();
-               this.template = template;
-               this.nodeName = nodeName;
-               this.driverStrategy = strategy;
-               
-               this.parallelism = template.getParallelism();
-
-               // check, if there is branch at this node. if yes, this 
candidate must be associated with
-               // the branching template node.
-               if (template.isBranching()) {
-                       this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>(6);
-                       this.branchPlan.put(template, this);
-               }
-       }
-       
-       protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) {
-               mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan);
-       }
-       
-       protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> 
branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) {
-               // merge the branchPlan maps according the template's 
uncloseBranchesStack
-               if (this.template.hasUnclosedBranches()) {
-                       if (this.branchPlan == null) {
-                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>(8);
-                       }
-       
-                       for (UnclosedBranchDescriptor uc : 
this.template.getOpenBranches()) {
-                               OptimizerNode brancher = uc.getBranchingNode();
-                               PlanNode selectedCandidate = null;
-       
-                               if (branchPlan1 != null) {
-                                       // predecessor 1 has branching 
children, see if it got the branch we are looking for
-                                       selectedCandidate = 
branchPlan1.get(brancher);
-                               }
-                               
-                               if (selectedCandidate == null && branchPlan2 != 
null) {
-                                       // predecessor 2 has branching 
children, see if it got the branch we are looking for
-                                       selectedCandidate = 
branchPlan2.get(brancher);
-                               }
-                               
-                               // it may be that the branch candidate is only 
found once the broadcast variables are set
-                               if (selectedCandidate != null) {
-                                       this.branchPlan.put(brancher, 
selectedCandidate);
-                               }
-                       }
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                           Accessors
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Gets the node from the optimizer DAG for which this plan candidate 
node was created.
-        * 
-        * @return The optimizer's DAG node.
-        */
-       public OptimizerNode getOriginalOptimizerNode() {
-               return this.template;
-       }
-       
-       /**
-        * Gets the program operator that this node represents in the plan.
-        * 
-        * @return The program operator this node represents in the plan.
-        */
-       public Operator<?> getProgramOperator() {
-               return this.template.getOperator();
-       }
-       
-       /**
-        * Gets the name of the plan node.
-        * 
-        * @return The name of the plan node.
-        */
-       public String getNodeName() {
-               return this.nodeName;
-       }
-       
-       public int getMemoryConsumerWeight() {
-               return this.driverStrategy.isMaterializing() ? 1 : 0;
-       }
-       
-       /**
-        * Gets the memory dedicated to each sub-task for this node.
-        * 
-        * @return The memory per task, in bytes.
-        */
-       public double getRelativeMemoryPerSubTask() {
-               return this.relativeMemoryPerSubTask;
-       }
-
-       /**
-        * Sets the memory dedicated to each task for this node.
-        * 
-        * @param relativeMemoryPerSubtask The relative memory per sub-task
-        */
-       public void setRelativeMemoryPerSubtask(double 
relativeMemoryPerSubtask) {
-               this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
-       }
-       
-       /**
-        * Gets the driver strategy from this node. This determines for example 
for a <i>match</i> Pact whether
-        * to use a merge or a hybrid hash strategy.
-        * 
-        * @return The driver strategy.
-        */
-       public DriverStrategy getDriverStrategy() {
-               return this.driverStrategy;
-       }
-       
-       /**
-        * Sets the driver strategy for this node. Usually should not be 
changed.
-        * 
-        * @param newDriverStrategy The driver strategy.
-        */
-       public void setDriverStrategy(DriverStrategy newDriverStrategy) {
-               this.driverStrategy = newDriverStrategy;
-       }
-       
-       public void initProperties(GlobalProperties globals, LocalProperties 
locals) {
-               if (this.globalProps != null || this.localProps != null) {
-                       throw new IllegalStateException();
-               }
-               this.globalProps = globals;
-               this.localProps = locals;
-       }
-       
-       /**
-        * Gets the local properties from this PlanNode.
-        *
-        * @return The local properties.
-        */
-       public LocalProperties getLocalProperties() {
-               return this.localProps;
-       }
-       
-       /**
-        * Gets the global properties from this PlanNode.
-        *
-        * @return The global properties.
-        */
-       public GlobalProperties getGlobalProperties() {
-               return this.globalProps;
-       }
-       
-       /**
-        * Gets the costs incurred by this node. The costs reflect also the 
costs incurred by the shipping strategies
-        * of the incoming connections.
-        * 
-        * @return The node-costs, or null, if not yet set.
-        */
-       public Costs getNodeCosts() {
-               return this.nodeCosts;
-       }
-
-       /**
-        * Gets the cumulative costs of this nose. The cumulative costs are the 
sum of the costs
-        * of this node and of all nodes in the subtree below this node.
-        * 
-        * @return The cumulative costs, or null, if not yet set.
-        */
-       public Costs getCumulativeCosts() {
-               return this.cumulativeCosts;
-       }
-
-       public Costs getCumulativeCostsShare() {
-               if (this.cumulativeCosts == null) {
-                       return null;
-               } else {
-                       Costs result = cumulativeCosts.clone();
-                       if (this.template.getOutgoingConnections() != null) {
-                               int outDegree = 
this.template.getOutgoingConnections().size();
-                               if (outDegree > 0) {
-                                       result.divideBy(outDegree);
-                               }
-                       }
-
-                       return result;
-               }
-       }
-
-       
-       /**
-        * Sets the basic cost for this node to the given value, and sets the 
cumulative costs
-        * to those costs plus the cost shares of all inputs (regular and 
broadcast).
-        * 
-        * @param nodeCosts      The already knows costs for this node
-        *                                              (this cost a produces 
by a concrete {@code OptimizerNode} subclass.
-        */
-       public void setCosts(Costs nodeCosts) {
-               // set the node costs
-               this.nodeCosts = nodeCosts;
-               
-               // the cumulative costs are the node costs plus the costs of 
all inputs
-               this.cumulativeCosts = nodeCosts.clone();
-               
-               // add all the normal inputs
-               for (PlanNode pred : getPredecessors()) {
-                       
-                       Costs parentCosts = pred.getCumulativeCostsShare();
-                       if (parentCosts != null) {
-                               this.cumulativeCosts.addCosts(parentCosts);
-                       } else {
-                               throw new CompilerException("Trying to set the 
costs of an operator before the predecessor costs are computed.");
-                       }
-               }
-               
-               // add all broadcast variable inputs
-               if (this.broadcastInputs != null) {
-                       for (NamedChannel nc : this.broadcastInputs) {
-                               Costs bcInputCost = 
nc.getSource().getCumulativeCostsShare();
-                               if (bcInputCost != null) {
-                                       
this.cumulativeCosts.addCosts(bcInputCost);
-                               } else {
-                                       throw new CompilerException("Trying to 
set the costs of an operator before the broadcast input costs are computed.");
-                               }
-                       }
-               }
-       }
-       
-       public void setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-       }
-       
-       public int getParallelism() {
-               return this.parallelism;
-       }
-       
-       public long getGuaranteedAvailableMemory() {
-               return this.template.getMinimalMemoryAcrossAllSubTasks();
-       }
-
-       public Map<OptimizerNode, PlanNode> getBranchPlan() {
-               return branchPlan;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                               Input, Predecessors, Successors
-       // 
--------------------------------------------------------------------------------------------
-       
-       public abstract Iterable<Channel> getInputs();
-       
-       @Override
-       public abstract Iterable<PlanNode> getPredecessors();
-       
-       /**
-        * Sets a list of all broadcast inputs attached to this node.
-        */
-       public void setBroadcastInputs(List<NamedChannel> broadcastInputs) {
-               if (broadcastInputs != null) {
-                       this.broadcastInputs = broadcastInputs;
-                       
-                       // update the branch map
-                       for (NamedChannel nc : broadcastInputs) {
-                               PlanNode source = nc.getSource();
-                               
-                               mergeBranchPlanMaps(branchPlan, 
source.branchPlan);
-                       }
-               }
-               
-               // do a sanity check that if we are branching, we have now 
candidates for each branch point
-               if (this.template.hasUnclosedBranches()) {
-                       if (this.branchPlan == null) {
-                               throw new CompilerException("Branching and 
rejoining logic did not find a candidate for the branching point.");
-                       }
-       
-                       for (UnclosedBranchDescriptor uc : 
this.template.getOpenBranches()) {
-                               OptimizerNode brancher = uc.getBranchingNode();
-                               if (this.branchPlan.get(brancher) == null) {
-                                       throw new CompilerException("Branching 
and rejoining logic did not find a candidate for the branching point.");
-                               }
-                       }
-               }
-       }
-       
-       /**
-        * Gets a list of all broadcast inputs attached to this node.
-        */
-       public List<NamedChannel> getBroadcastInputs() {
-               return this.broadcastInputs;
-       }
-       
-       /**
-        * Adds a channel to a successor node to this node.
-        * 
-        * @param channel The channel to the successor.
-        */
-       public void addOutgoingChannel(Channel channel) {
-               this.outChannels.add(channel);
-       }
-       
-       /**
-        * Gets a list of all outgoing channels leading to successors.
-        * 
-        * @return A list of all channels leading to successors.
-        */
-       public List<Channel> getOutgoingChannels() {
-               return this.outChannels;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                Miscellaneous
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void updatePropertiesWithUniqueSets(Set<FieldSet> 
uniqueFieldCombinations) {
-               if (uniqueFieldCombinations == null || 
uniqueFieldCombinations.isEmpty()) {
-                       return;
-               }
-               for (FieldSet fields : uniqueFieldCombinations) {
-                       this.globalProps.addUniqueFieldCombination(fields);
-                       this.localProps = 
this.localProps.addUniqueFields(fields);
-               }
-       }
-
-       public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) {
-               if (branchPlan == null) {
-                       return null;
-               } else {
-                       return this.branchPlan.get(branchPoint);
-               }
-       }
-       
-       /**
-        * Sets the pruning marker to true.
-        */
-       public void setPruningMarker() {
-               this.pFlag = true;
-       }
-       
-       /**
-        * Checks whether the pruning marker was set.
-        * 
-        * @return True, if the pruning marker was set, false otherwise.
-        */
-       public boolean isPruneMarkerSet() {
-               return this.pFlag;
-       }
-       
-       public boolean isOnDynamicPath() {
-               return this.template.isOnDynamicPath();
-       }
-       
-       public int getCostWeight() {
-               return this.template.getCostWeight();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Checks whether this node has a dam on the way down to the given 
source node. This method
-        * returns either that (a) the source node is not found as a 
(transitive) child of this node,
-        * (b) the node is found, but no dam is on the path, or (c) the node is 
found and a dam is on
-        * the path.
-        * 
-        * @param source The node on the path to which the dam is sought.
-        * @return The result whether the node is found and whether a dam is on 
the path.
-        */
-       public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
-       
-       public FeedbackPropertiesMeetRequirementsReport 
checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties 
feedbackGlobal, LocalProperties feedbackLocal) {
-               if (this == partialSolution) {
-                       return FeedbackPropertiesMeetRequirementsReport.PENDING;
-               }
-               
-               boolean found = false;
-               boolean allMet = true;
-               boolean allLocallyMet = true;
-               
-               for (Channel input : getInputs()) {
-                       FeedbackPropertiesMeetRequirementsReport inputState = 
input.getSource().checkPartialSolutionPropertiesMet(partialSolution, 
feedbackGlobal, feedbackLocal);
-                       
-                       if (inputState == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-                               continue;
-                       }
-                       else if (inputState == 
FeedbackPropertiesMeetRequirementsReport.MET) {
-                               found = true;
-                               continue;
-                       }
-                       else if (inputState == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-                               return 
FeedbackPropertiesMeetRequirementsReport.NOT_MET;
-                       }
-                       else {
-                               found = true;
-                               
-                               // the partial solution was on the path here. 
check whether the channel requires
-                               // certain properties that are met, or whether 
the channel introduces new properties
-                               
-                               // if the plan introduces new global 
properties, then we can stop looking whether
-                               // the feedback properties are sufficient to 
meet the requirements
-                               if (input.getShipStrategy() != 
ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
-                                       continue;
-                               }
-                               
-                               // first check whether this channel requires 
something that is not met
-                               if (input.getRequiredGlobalProps() != null && 
!input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
-                                       return 
FeedbackPropertiesMeetRequirementsReport.NOT_MET;
-                               }
-                               
-                               // in general, not everything is met here 
already
-                               allMet = false;
-                               
-                               // if the plan introduces new local properties, 
we can stop checking for matching local properties
-                               if (inputState != 
FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
-                                       
-                                       if (input.getLocalStrategy() == 
LocalStrategy.NONE) {
-                                               
-                                               if 
(input.getRequiredLocalProps() != null && 
!input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
-                                                       return 
FeedbackPropertiesMeetRequirementsReport.NOT_MET;
-                                               }
-                                               
-                                               allLocallyMet = false;
-                                       }
-                               }
-                       }
-               }
-               
-               if (!found) {
-                       return 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
-               } else if (allMet) {
-                       return FeedbackPropertiesMeetRequirementsReport.MET;
-               } else if (allLocallyMet) {
-                       return 
FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
-               } else {
-                       return FeedbackPropertiesMeetRequirementsReport.PENDING;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public String toString() {
-               return this.template.getName() + " \"" + 
getProgramOperator().getName() + "\" : " + this.driverStrategy +
-                               " [[ " + this.globalProps + " ]] [[ " + 
this.localProps + " ]]";
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public OptimizerNode getOptimizerNode() {
-               return this.template;
-       }
-
-       @Override
-       public PlanNode getPlanNode() {
-               return this;
-       }
-
-       @Override
-       public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
-               List<DumpableConnection<PlanNode>> allInputs = new 
ArrayList<DumpableConnection<PlanNode>>();
-               
-               for (Channel c : getInputs()) {
-                       allInputs.add(c);
-               }
-               
-               for (NamedChannel c : getBroadcastInputs()) {
-                       allInputs.add(c);
-               }
-               
-               return allInputs;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static enum SourceAndDamReport {
-               NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
-       }
-       
-       
-       
-       public static enum FeedbackPropertiesMeetRequirementsReport {
-               /** Indicates that the path is irrelevant */
-               NO_PARTIAL_SOLUTION,
-               
-               /** Indicates that the question whether the properties are met 
has been determined pending
-                * dependent on global and local properties */
-               PENDING,
-               
-               /** Indicates that the question whether the properties are met 
has been determined pending
-                * dependent on global properties only */
-               PENDING_LOCAL_MET,
-               
-               /** Indicates that the question whether the properties are met 
has been determined true */
-               MET,
-               
-               /** Indicates that the question whether the properties are met 
has been determined false */
-               NOT_MET;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
deleted file mode 100644
index b928be7..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-/**
- * 
- */
-public class SingleInputPlanNode extends PlanNode {
-       
-       protected final Channel input;
-       
-       protected final FieldList[] driverKeys;
-       
-       protected final boolean[][] driverSortOrders;
-       
-       private TypeComparatorFactory<?>[] comparators;
-       
-       public Object postPassHelper;
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       public SingleInputPlanNode(OptimizerNode template, String nodeName, 
Channel input, DriverStrategy driverStrategy) {
-               this(template, nodeName, input, driverStrategy, null, null);
-       }
-       
-       public SingleInputPlanNode(OptimizerNode template, String nodeName, 
Channel input, 
-                       DriverStrategy driverStrategy, FieldList 
driverKeyFields)
-       {
-               this(template, nodeName, input, driverStrategy, 
driverKeyFields, getTrueArray(driverKeyFields.size()));
-       }
-       
-       public SingleInputPlanNode(OptimizerNode template, String nodeName, 
Channel input, 
-                       DriverStrategy driverStrategy, FieldList 
driverKeyFields, boolean[] driverSortOrders)
-       {
-               super(template, nodeName, driverStrategy);
-               this.input = input;
-               
-               this.comparators = new 
TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
-               this.driverKeys = new 
FieldList[driverStrategy.getNumRequiredComparators()];
-               this.driverSortOrders = new 
boolean[driverStrategy.getNumRequiredComparators()][];
-               
-               if(driverStrategy.getNumRequiredComparators() > 0) {
-                       this.driverKeys[0] = driverKeyFields;
-                       this.driverSortOrders[0] = driverSortOrders;
-               }
-               
-               if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) 
{
-                       this.input.setReplicationFactor(getParallelism());
-               }
-               
-               final PlanNode predNode = input.getSource();
-               
-               if (predNode.branchPlan != null && 
!predNode.branchPlan.isEmpty()) {
-                       
-                       if (this.branchPlan == null) {
-                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
-                       }
-                       this.branchPlan.putAll(predNode.branchPlan);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public SingleInputNode getSingleInputNode() {
-               if (this.template instanceof SingleInputNode) {
-                       return (SingleInputNode) this.template;
-               } else {
-                       throw new RuntimeException();
-               }
-       }
-       
-       /**
-        * Gets the input channel to this node.
-        * 
-        * @return The input channel to this node.
-        */
-       public Channel getInput() {
-               return this.input;
-       }
-       
-       /**
-        * Gets the predecessor of this node, i.e. the source of the input 
channel.
-        * 
-        * @return The predecessor of this node.
-        */
-       public PlanNode getPredecessor() {
-               return this.input.getSource();
-       }
-       
-       /**
-        * Sets the key field indexes for the specified driver comparator.
-        * 
-        * @param keys The key field indexes for the specified driver 
comparator.
-        * @param id The ID of the driver comparator.
-        */
-       public void setDriverKeyInfo(FieldList keys, int id) {
-               this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
-       }
-       
-       /**
-        * Sets the key field information for the specified driver comparator.
-        * 
-        * @param keys The key field indexes for the specified driver 
comparator.
-        * @param sortOrder The key sort order for the specified driver 
comparator.
-        * @param id The ID of the driver comparator.
-        */
-       public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int 
id) {
-               if(id < 0 || id >= driverKeys.length) {
-                       throw new CompilerException("Invalid id for driver key 
information. DriverStrategy requires only "
-                                                                               
        +super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
-               }
-               this.driverKeys[id] = keys;
-               this.driverSortOrders[id] = sortOrder;
-       }
-       
-       /**
-        * Gets the key field indexes for the specified driver comparator.
-        * 
-        * @param id The id of the driver comparator for which the key field 
indexes are requested.
-        * @return The key field indexes of the specified driver comparator.
-        */
-       public FieldList getKeys(int id) {
-               return this.driverKeys[id];
-       }
-       
-       /**
-        * Gets the sort order for the specified driver comparator.
-        * 
-        * @param id The id of the driver comparator for which the sort order 
is requested.
-        * @return The sort order of the specified driver comparator.
-        */
-       public boolean[] getSortOrders(int id) {
-               return driverSortOrders[id];
-       }
-       
-       /**
-        * Gets the specified comparator from this PlanNode.
-        * 
-        * @param id The ID of the requested comparator.
-        *
-        * @return The specified comparator.
-        */
-       public TypeComparatorFactory<?> getComparator(int id) {
-               return comparators[id];
-       }
-       
-       /**
-        * Sets the specified comparator for this PlanNode.
-        *
-        * @param comparator The comparator to set.
-        * @param id The ID of the comparator to set.
-        */
-       public void setComparator(TypeComparatorFactory<?> comparator, int id) {
-               this.comparators[id] = comparator;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       this.input.getSource().accept(visitor);
-                       
-                       for (Channel broadcastInput : getBroadcastInputs()) {
-                               broadcastInput.getSource().accept(visitor);
-                       }
-                       
-                       visitor.postVisit(this);
-               }
-       }
-
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               if (getBroadcastInputs() == null || 
getBroadcastInputs().isEmpty()) {
-                       return Collections.singleton(this.input.getSource());
-               }
-               else {
-                       List<PlanNode> preds = new ArrayList<PlanNode>();
-                       preds.add(input.getSource());
-                       
-                       for (Channel c : getBroadcastInputs()) {
-                               preds.add(c.getSource());
-                       }
-                       
-                       return preds;
-               }
-       }
-
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Collections.singleton(this.input);
-       }
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               }
-               SourceAndDamReport res = 
this.input.getSource().hasDamOnPathDownTo(source);
-               if (res == FOUND_SOURCE_AND_DAM) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               else if (res == FOUND_SOURCE) {
-                       return (this.input.getLocalStrategy().dams() || 
this.input.getTempMode().breaksPipeline() ||
-                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) ?
-                               FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
-               }
-               else {
-                       // NOT_FOUND
-                       // check the broadcast inputs
-                       
-                       for (NamedChannel nc : getBroadcastInputs()) {
-                               SourceAndDamReport bcRes = 
nc.getSource().hasDamOnPathDownTo(source);
-                               if (bcRes != NOT_FOUND) {
-                                       // broadcast inputs are always dams
-                                       return FOUND_SOURCE_AND_DAM;
-                               }
-                       }
-                       return NOT_FOUND;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       protected static boolean[] getTrueArray(int length) {
-               final boolean[] a = new boolean[length];
-               for (int i = 0; i < length; i++) {
-                       a[i] = true;
-               }
-               return a;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
deleted file mode 100644
index 451484d..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.List;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-/**
- *
- */
-public class SinkJoinerPlanNode extends DualInputPlanNode {
-       
-       public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel 
input2) {
-               super(template, "", input1, input2, 
DriverStrategy.BINARY_NO_OP);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void setCosts(Costs nodeCosts) {
-               // the plan enumeration logic works as for regular 
two-input-operators, which is important
-               // because of the branch handling logic. it does pick 
redistributing network channels
-               // between the sink and the sink joiner, because sinks joiner 
has a different DOP than the sink.
-               // we discard any cost and simply use the sum of the costs from 
the two children.
-               
-               Costs totalCosts = 
getInput1().getSource().getCumulativeCosts().clone();
-               
totalCosts.addCosts(getInput2().getSource().getCumulativeCosts());
-               super.setCosts(totalCosts);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void getDataSinks(List<SinkPlanNode> sinks) {
-               final PlanNode in1 = this.input1.getSource();
-               final PlanNode in2 = this.input2.getSource();
-               
-               if (in1 instanceof SinkPlanNode) {
-                       sinks.add((SinkPlanNode) in1);
-               } else if (in1 instanceof SinkJoinerPlanNode) {
-                       ((SinkJoinerPlanNode) in1).getDataSinks(sinks);
-               } else {
-                       throw new CompilerException("Illegal child node for a 
sink joiner utility node: Neither Sink nor Sink Joiner");
-               }
-               
-               if (in2 instanceof SinkPlanNode) {
-                       sinks.add((SinkPlanNode) in2);
-               } else if (in2 instanceof SinkJoinerPlanNode) {
-                       ((SinkJoinerPlanNode) in2).getDataSinks(sinks);
-               } else {
-                       throw new CompilerException("Illegal child node for a 
sink joiner utility node: Neither Sink nor Sink Joiner");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
deleted file mode 100644
index 656e67f..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * Plan candidate node for data flow sinks.
- */
-public class SinkPlanNode extends SingleInputPlanNode
-{
-       /**
-        * Constructs a new sink candidate node that uses <i>NONE</i> as its 
local strategy. Note that
-        * local sorting and range partitioning are handled by the incoming 
channel already.
-        * 
-        * @param template The template optimizer node that this candidate is 
created for.
-        */
-       public SinkPlanNode(DataSinkNode template, String nodeName, Channel 
input) {
-               super(template, nodeName, input, DriverStrategy.NONE);
-               
-               this.globalProps = input.getGlobalProperties().clone();
-               this.localProps = input.getLocalProperties().clone();
-       }
-       
-       public DataSinkNode getSinkNode() {
-               if (this.template instanceof DataSinkNode) {
-                       return (DataSinkNode) this.template;
-               } else {
-                       throw new RuntimeException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
deleted file mode 100644
index 63093dd..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SolutionSetNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for partial solution of a bulk iteration.
- */
-public class SolutionSetPlanNode extends PlanNode {
-       
-       private static final Costs NO_COSTS = new Costs();
-       
-       private WorksetIterationPlanNode containingIterationNode;
-       
-       private final Channel initialInput;
-       
-       public Object postPassHelper;
-       
-       
-       public SolutionSetPlanNode(SolutionSetNode template, String nodeName,
-                       GlobalProperties gProps, LocalProperties lProps,
-                       Channel initialInput)
-       {
-               super(template, nodeName, DriverStrategy.NONE);
-               
-               this.globalProps = gProps;
-               this.localProps = lProps;
-               this.initialInput = initialInput;
-               
-               // the node incurs no cost
-               this.nodeCosts = NO_COSTS;
-               this.cumulativeCosts = NO_COSTS;
-               
-               if (initialInput.getSource().branchPlan != null && 
initialInput.getSource().branchPlan.size() > 0) {
-                       if (this.branchPlan == null) {
-                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
-                       }
-                       
-                       
this.branchPlan.putAll(initialInput.getSource().branchPlan);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public SolutionSetNode getSolutionSetNode() {
-               return (SolutionSetNode) this.template;
-       }
-       
-       public WorksetIterationPlanNode getContainingIterationNode() {
-               return this.containingIterationNode;
-       }
-       
-       public void setContainingIterationNode(WorksetIterationPlanNode 
containingIterationNode) {
-               this.containingIterationNode = containingIterationNode;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       visitor.postVisit(this);
-               }
-       }
-
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               return Collections.<PlanNode>emptyList();
-       }
-
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Collections.<Channel>emptyList();
-       }
-
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               
-               SourceAndDamReport res = 
this.initialInput.getSource().hasDamOnPathDownTo(source);
-               if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) {
-                       return FOUND_SOURCE_AND_DAM;
-               } else {
-                       return NOT_FOUND;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
deleted file mode 100644
index 11b7cc9..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for data flow sources that have no input and no special 
strategies.
- */
-public class SourcePlanNode extends PlanNode {
-       
-       private TypeSerializerFactory<?> serializer;
-       
-       /**
-        * Constructs a new source candidate node that uses <i>NONE</i> as its 
local strategy.
-        * 
-        * @param template The template optimizer node that this candidate is 
created for.
-        */
-       public SourcePlanNode(DataSourceNode template, String nodeName) {
-               this(template, nodeName, new GlobalProperties(), new 
LocalProperties());
-       }
-
-       public SourcePlanNode(DataSourceNode template, String nodeName, 
GlobalProperties gprops, LocalProperties lprops) {
-               super(template, nodeName, DriverStrategy.NONE);
-
-               this.globalProps = gprops;
-               this.localProps = lprops;
-               updatePropertiesWithUniqueSets(template.getUniqueFields());
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public DataSourceNode getDataSourceNode() {
-               return (DataSourceNode) this.template;
-       }
-       
-       /**
-        * Gets the serializer from this PlanNode.
-        *
-        * @return The serializer.
-        */
-       public TypeSerializerFactory<?> getSerializer() {
-               return serializer;
-       }
-       
-       /**
-        * Sets the serializer for this PlanNode.
-        *
-        * @param serializer The serializer to set.
-        */
-       public void setSerializer(TypeSerializerFactory<?> serializer) {
-               this.serializer = serializer;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       visitor.postVisit(this);
-               }
-       }
-
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               return Collections.<PlanNode>emptyList();
-       }
-
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Collections.<Channel>emptyList();
-       }
-
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               } else {
-                       return NOT_FOUND;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
deleted file mode 100644
index 880f2e3..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-/**
- * Abstract class representing Flink Streaming plans
- * 
- */
-public abstract class StreamingPlan implements FlinkPlan {
-
-       public abstract JobGraph getJobGraph();
-
-       public abstract String getStreamingPlanAsJSON();
-
-       public abstract void dumpStreamingPlanAsJSON(File file) throws 
IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
deleted file mode 100644
index 95adace..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * A node in the execution, representing a workset iteration (delta iteration).
- */
-public class WorksetIterationPlanNode extends DualInputPlanNode implements 
IterationPlanNode {
-
-       private final SolutionSetPlanNode solutionSetPlanNode;
-       
-       private final WorksetPlanNode worksetPlanNode;
-       
-       private final PlanNode solutionSetDeltaPlanNode;
-       
-       private final PlanNode nextWorkSetPlanNode;
-       
-       private TypeSerializerFactory<?> worksetSerializer;
-       
-       private TypeSerializerFactory<?> solutionSetSerializer;
-       
-       private TypeComparatorFactory<?> solutionSetComparator;
-       
-       private boolean immediateSolutionSetUpdate;
-       
-       public Object postPassHelper;
-       
-       private TypeSerializerFactory<?> serializerForIterationChannel;
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       public WorksetIterationPlanNode(WorksetIterationNode template, String 
nodeName, Channel initialSolutionSet, Channel initialWorkset,
-                       SolutionSetPlanNode solutionSetPlanNode, 
WorksetPlanNode worksetPlanNode,
-                       PlanNode nextWorkSetPlanNode, PlanNode 
solutionSetDeltaPlanNode)
-       {
-               super(template, nodeName, initialSolutionSet, initialWorkset, 
DriverStrategy.BINARY_NO_OP);
-               this.solutionSetPlanNode = solutionSetPlanNode;
-               this.worksetPlanNode = worksetPlanNode;
-               this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
-               this.nextWorkSetPlanNode = nextWorkSetPlanNode;
-               
-               mergeBranchPlanMaps();
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public WorksetIterationNode getIterationNode() {
-               if (this.template instanceof WorksetIterationNode) {
-                       return (WorksetIterationNode) this.template;
-               } else {
-                       throw new RuntimeException();
-               }
-       }
-       
-       public SolutionSetPlanNode getSolutionSetPlanNode() {
-               return this.solutionSetPlanNode;
-       }
-       
-       public WorksetPlanNode getWorksetPlanNode() {
-               return this.worksetPlanNode;
-       }
-       
-       public PlanNode getSolutionSetDeltaPlanNode() {
-               return this.solutionSetDeltaPlanNode;
-       }
-       
-       public PlanNode getNextWorkSetPlanNode() {
-               return this.nextWorkSetPlanNode;
-       }
-       
-       public Channel getInitialSolutionSetInput() {
-               return getInput1();
-       }
-       
-       public Channel getInitialWorksetInput() {
-               return getInput2();
-       }
-       
-       public void setImmediateSolutionSetUpdate(boolean immediateUpdate) {
-               this.immediateSolutionSetUpdate = immediateUpdate;
-       }
-       
-       public boolean isImmediateSolutionSetUpdate() {
-               return this.immediateSolutionSetUpdate;
-       }
-       
-       public FieldList getSolutionSetKeyFields() {
-               return getIterationNode().getSolutionSetKeyFields();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public TypeSerializerFactory<?> getWorksetSerializer() {
-               return worksetSerializer;
-       }
-       
-       public void setWorksetSerializer(TypeSerializerFactory<?> 
worksetSerializer) {
-               this.worksetSerializer = worksetSerializer;
-       }
-       
-       public TypeSerializerFactory<?> getSolutionSetSerializer() {
-               return solutionSetSerializer;
-       }
-       
-       public void setSolutionSetSerializer(TypeSerializerFactory<?> 
solutionSetSerializer) {
-               this.solutionSetSerializer = solutionSetSerializer;
-       }
-       
-       public TypeComparatorFactory<?> getSolutionSetComparator() {
-               return solutionSetComparator;
-       }
-       
-       public void setSolutionSetComparator(TypeComparatorFactory<?> 
solutionSetComparator) {
-               this.solutionSetComparator = solutionSetComparator;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void setCosts(Costs nodeCosts) {
-               // add the costs from the step function
-               
nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare());
-               
nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare());
-
-               super.setCosts(nodeCosts);
-       }
-       
-       public int getMemoryConsumerWeight() {
-               // solution set index and workset back channel
-               return 2;
-       }
-       
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               }
-               
-               SourceAndDamReport fromOutside = 
super.hasDamOnPathDownTo(source);
-
-               if (fromOutside == FOUND_SOURCE_AND_DAM) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               else if (fromOutside == FOUND_SOURCE) {
-                       // we always have a dam in the solution set index
-                       return FOUND_SOURCE_AND_DAM;
-               } else {
-                       SourceAndDamReport fromNextWorkset = 
nextWorkSetPlanNode.hasDamOnPathDownTo(source);
-
-                       if (fromNextWorkset == FOUND_SOURCE_AND_DAM){
-                               return FOUND_SOURCE_AND_DAM;
-                       } else if (fromNextWorkset == FOUND_SOURCE){
-                               return FOUND_SOURCE_AND_DAM;
-                       } else {
-                               return 
this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source);
-                       }
-               }
-       }
-
-       @Override
-       public void acceptForStepFunction(Visitor<PlanNode> visitor) {
-               this.solutionSetDeltaPlanNode.accept(visitor);
-               this.nextWorkSetPlanNode.accept(visitor);
-       }
-
-       /**
-        * Merging can only take place after the solutionSetDelta and 
nextWorkset PlanNode has been set,
-        * because they can contain also some of the branching nodes.
-        */
-       @Override
-       protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> 
branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
-
-       
-       protected void mergeBranchPlanMaps() {
-               Map<OptimizerNode, PlanNode> branchPlan1 = 
input1.getSource().branchPlan;
-               Map<OptimizerNode, PlanNode> branchPlan2 = 
input2.getSource().branchPlan;
-
-               // merge the branchPlan maps according the template's 
uncloseBranchesStack
-               if (this.template.hasUnclosedBranches()) {
-                       if (this.branchPlan == null) {
-                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>(8);
-                       }
-
-                       for (OptimizerNode.UnclosedBranchDescriptor uc : 
this.template.getOpenBranches()) {
-                               OptimizerNode brancher = uc.getBranchingNode();
-                               PlanNode selectedCandidate = null;
-
-                               if (branchPlan1 != null) {
-                                       // predecessor 1 has branching 
children, see if it got the branch we are looking for
-                                       selectedCandidate = 
branchPlan1.get(brancher);
-                               }
-
-                               if (selectedCandidate == null && branchPlan2 != 
null) {
-                                       // predecessor 2 has branching 
children, see if it got the branch we are looking for
-                                       selectedCandidate = 
branchPlan2.get(brancher);
-                               }
-
-                               if(selectedCandidate == null && 
getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode()
-                                               .branchPlan != null){
-                                       selectedCandidate = 
getSolutionSetDeltaPlanNode().branchPlan.get(brancher);
-                               }
-
-                               if(selectedCandidate == null && 
getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode()
-                                               .branchPlan != null){
-                                       selectedCandidate = 
getNextWorkSetPlanNode().branchPlan.get(brancher);
-                               }
-
-                               if (selectedCandidate == null) {
-                                       throw new CompilerException(
-                                                       "Candidates for a node 
with open branches are missing information about the selected candidate ");
-                               }
-
-                               this.branchPlan.put(brancher, 
selectedCandidate);
-                       }
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public TypeSerializerFactory<?> getSerializerForIterationChannel() {
-               return serializerForIterationChannel;
-       }
-       
-       public void setSerializerForIterationChannel(TypeSerializerFactory<?> 
serializerForIterationChannel) {
-               this.serializerForIterationChannel = 
serializerForIterationChannel;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
deleted file mode 100644
index 8d044d6..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.WorksetNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for partial solution of a bulk iteration.
- */
-public class WorksetPlanNode extends PlanNode {
-       
-       private static final Costs NO_COSTS = new Costs();
-       
-       private WorksetIterationPlanNode containingIterationNode;
-       
-       private final Channel initialInput;
-       
-       public Object postPassHelper;
-       
-       
-       public WorksetPlanNode(WorksetNode template, String nodeName,
-                       GlobalProperties gProps, LocalProperties lProps,
-                       Channel initialInput)
-       {
-               super(template, nodeName, DriverStrategy.NONE);
-               
-               this.globalProps = gProps;
-               this.localProps = lProps;
-               this.initialInput = initialInput;
-               
-               // the node incurs no cost
-               this.nodeCosts = NO_COSTS;
-               this.cumulativeCosts = NO_COSTS;
-               
-               if (initialInput.getSource().branchPlan != null && 
initialInput.getSource().branchPlan.size() > 0) {
-                       if (this.branchPlan == null) {
-                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
-                       }
-                       
-                       
this.branchPlan.putAll(initialInput.getSource().branchPlan);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public WorksetNode getWorksetNode() {
-               return (WorksetNode) this.template;
-       }
-       
-       public WorksetIterationPlanNode getContainingIterationNode() {
-               return this.containingIterationNode;
-       }
-       
-       public void setContainingIterationNode(WorksetIterationPlanNode 
containingIterationNode) {
-               this.containingIterationNode = containingIterationNode;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       visitor.postVisit(this);
-               }
-       }
-
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               return Collections.<PlanNode>emptyList();
-       }
-
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Collections.<Channel>emptyList();
-       }
-
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               }
-               SourceAndDamReport res = 
this.initialInput.getSource().hasDamOnPathDownTo(source);
-               if (res == FOUND_SOURCE_AND_DAM) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               else if (res == FOUND_SOURCE) {
-                       return (this.initialInput.getLocalStrategy().dams() || 
-                                       
this.initialInput.getTempMode().breaksPipeline() ||
-                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) ?
-                               FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
-               }
-               else {
-                       return NOT_FOUND;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
deleted file mode 100644
index 3f8cb46..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plandump;
-
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-
-/**
- *
- */
-public interface DumpableConnection<T extends DumpableNode<T>> {
-
-       public DumpableNode<T> getSource();
-       
-       public ShipStrategyType getShipStrategy();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
deleted file mode 100644
index 1bc0f0c..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plandump;
-
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.plan.PlanNode;
-
-/**
- *
- */
-public interface DumpableNode<T extends DumpableNode<T>> {
-       
-       /**
-        * Gets an iterator over the predecessors.
-        * 
-        * @return An iterator over the predecessors.
-        */
-       Iterable<T> getPredecessors();
-       
-       Iterable<DumpableConnection<T>> getDumpableInputs();
-       
-       OptimizerNode getOptimizerNode();
-       
-       PlanNode getPlanNode();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
deleted file mode 100644
index 6f918c0..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plandump;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.api.common.operators.CompilerHints;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.DagConnection;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.StringUtils;
-
-/**
- * 
- */
-public class PlanJSONDumpGenerator {
-       
-       private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes 
to ids
-
-       private int nodeCnt;
-       
-       private boolean encodeForHTML;
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void setEncodeForHTML(boolean encodeForHTML) {
-               this.encodeForHTML = encodeForHTML;
-       }
-       
-       public boolean isEncodeForHTML() {
-               return encodeForHTML;
-       }
-       
-       
-       public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter 
writer) {
-               @SuppressWarnings("unchecked")
-               List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) 
nodes;
-               compilePlanToJSON(n, writer);
-       }
-       
-       public String getPactPlanAsJSON(List<DataSinkNode> nodes) {
-               StringWriter sw = new StringWriter();
-               PrintWriter pw = new PrintWriter(sw);
-               dumpPactPlanAsJSON(nodes, pw);
-               return sw.toString();
-       }
-       
-       public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) 
throws IOException {
-               PrintWriter pw = null;
-               try {
-                       pw = new PrintWriter(new FileOutputStream(toFile), 
false);
-                       dumpOptimizerPlanAsJSON(plan, pw);
-                       pw.flush();
-               } finally {
-                       if (pw != null) {
-                               pw.close();
-                       }
-               }
-       }
-       
-       public String getOptimizerPlanAsJSON(OptimizedPlan plan) {
-               StringWriter sw = new StringWriter();
-               PrintWriter pw = new PrintWriter(sw);
-               dumpOptimizerPlanAsJSON(plan, pw);
-               pw.close();
-               return sw.toString();
-       }
-       
-       public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter 
writer) {
-               Collection<SinkPlanNode> sinks = plan.getDataSinks();
-               if (sinks instanceof List) {
-                       dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, 
writer);
-               } else {
-                       List<SinkPlanNode> n = new ArrayList<SinkPlanNode>();
-                       n.addAll(sinks);
-                       dumpOptimizerPlanAsJSON(n, writer);
-               }
-       }
-       
-       public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, 
PrintWriter writer) {
-               @SuppressWarnings("unchecked")
-               List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) 
nodes;
-               compilePlanToJSON(n, writer);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter 
writer) {
-               // initialization to assign node ids
-               this.nodeIds = new HashMap<DumpableNode<?>, Integer>();
-               this.nodeCnt = 0;
-               
-               // JSON header
-               writer.print("{\n\t\"nodes\": [\n\n");
-
-               // Generate JSON for plan
-               for (int i = 0; i < nodes.size(); i++) {
-                       visit(nodes.get(i), writer, i == 0);
-               }
-               
-               // JSON Footer
-               writer.println("\n\t]\n}");
-       }
-
-       private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean 
first) {
-               // check for duplicate traversal
-               if (this.nodeIds.containsKey(node)) {
-                       return false;
-               }
-               
-               // assign an id first
-               this.nodeIds.put(node, this.nodeCnt++);
-               
-               // then recurse
-               for (DumpableNode<?> child : node.getPredecessors()) {
-                       //This is important, because when the node was already 
in the graph it is not allowed
-                       //to set first to false!
-                       if (visit(child, writer, first)) {
-                               first = false;
-                       };
-               }
-               
-               // check if this node should be skipped from the dump
-               final OptimizerNode n = node.getOptimizerNode();
-               
-               // ------------------ dump after the ascend 
---------------------
-               // start a new node and output node id
-               if (!first) {
-                       writer.print(",\n");    
-               }
-               // open the node
-               writer.print("\t{\n");
-               
-               // recurse, it is is an iteration node
-               if (node instanceof BulkIterationNode || node instanceof 
BulkIterationPlanNode) {
-                       
-                       DumpableNode<?> innerChild = node instanceof 
BulkIterationNode ?
-                                       ((BulkIterationNode) 
node).getNextPartialSolution() :
-                                       ((BulkIterationPlanNode) 
node).getRootOfStepFunction();
-                                       
-                       DumpableNode<?> begin = node instanceof 
BulkIterationNode ?
-                               ((BulkIterationNode) node).getPartialSolution() 
:
-                               ((BulkIterationPlanNode) 
node).getPartialSolutionPlanNode();
-                       
-                       writer.print("\t\t\"step_function\": [\n");
-                       
-                       visit(innerChild, writer, true);
-                       
-                       writer.print("\n\t\t],\n");
-                       writer.print("\t\t\"partial_solution\": " + 
this.nodeIds.get(begin) + ",\n");
-                       writer.print("\t\t\"next_partial_solution\": " + 
this.nodeIds.get(innerChild) + ",\n");
-               } else if (node instanceof WorksetIterationNode || node 
instanceof WorksetIterationPlanNode) {
-                       
-                       DumpableNode<?> worksetRoot = node instanceof 
WorksetIterationNode ?
-                                       ((WorksetIterationNode) 
node).getNextWorkset() :
-                                       ((WorksetIterationPlanNode) 
node).getNextWorkSetPlanNode();
-                       DumpableNode<?> solutionDelta = node instanceof 
WorksetIterationNode ?
-                                       ((WorksetIterationNode) 
node).getSolutionSetDelta() :
-                                       ((WorksetIterationPlanNode) 
node).getSolutionSetDeltaPlanNode();
-                                       
-                       DumpableNode<?> workset = node instanceof 
WorksetIterationNode ?
-                                               ((WorksetIterationNode) 
node).getWorksetNode() :
-                                               ((WorksetIterationPlanNode) 
node).getWorksetPlanNode();
-                       DumpableNode<?> solutionSet = node instanceof 
WorksetIterationNode ?
-                                               ((WorksetIterationNode) 
node).getSolutionSetNode() :
-                                               ((WorksetIterationPlanNode) 
node).getSolutionSetPlanNode();
-                       
-                       writer.print("\t\t\"step_function\": [\n");
-                       
-                       visit(worksetRoot, writer, true);
-                       visit(solutionDelta, writer, false);
-                       
-                       writer.print("\n\t\t],\n");
-                       writer.print("\t\t\"workset\": " + 
this.nodeIds.get(workset) + ",\n");
-                       writer.print("\t\t\"solution_set\": " + 
this.nodeIds.get(solutionSet) + ",\n");
-                       writer.print("\t\t\"next_workset\": " + 
this.nodeIds.get(worksetRoot) + ",\n");
-                       writer.print("\t\t\"solution_delta\": " + 
this.nodeIds.get(solutionDelta) + ",\n");
-               }
-               
-               // print the id
-               writer.print("\t\t\"id\": " + this.nodeIds.get(node));
-
-               
-               final String type;
-               String contents;
-               if (n instanceof DataSinkNode) {
-                       type = "sink";
-                       contents = n.getOperator().toString();
-               } else if (n instanceof DataSourceNode) {
-                       type = "source";
-                       contents = n.getOperator().toString();
-               }
-               else if (n instanceof BulkIterationNode) {
-                       type = "bulk_iteration";
-                       contents = n.getOperator().getName();
-               }
-               else if (n instanceof WorksetIterationNode) {
-                       type = "workset_iteration";
-                       contents = n.getOperator().getName();
-               }
-               else if (n instanceof BinaryUnionNode) {
-                       type = "pact";
-                       contents = "";
-               }
-               else {
-                       type = "pact";
-                       contents = n.getOperator().getName();
-               }
-               
-               contents = StringUtils.showControlCharacters(contents);
-               if (encodeForHTML) {
-                       contents = StringEscapeUtils.escapeHtml4(contents);
-                       contents = contents.replace("\\", "&#92;");
-               }
-               
-               
-               String name = n.getName();
-               if (name.equals("Reduce") && (node instanceof 
SingleInputPlanNode) && 
-                               ((SingleInputPlanNode) 
node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
-                       name = "Combine";
-               }
-               
-               // output the type identifier
-               writer.print(",\n\t\t\"type\": \"" + type + "\"");
-               
-               // output node name
-               writer.print(",\n\t\t\"pact\": \"" + name + "\"");
-               
-               // output node contents
-               writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
-
-               // degree of parallelism
-               writer.print(",\n\t\t\"parallelism\": \""
-                       + (n.getParallelism() >= 1 ? n.getParallelism() : 
"default") + "\"");
-               
-               // output node predecessors
-               Iterator<? extends DumpableConnection<?>> inConns = 
node.getDumpableInputs().iterator();
-               String child1name = "", child2name = "";
-
-               if (inConns != null && inConns.hasNext()) {
-                       // start predecessor list
-                       writer.print(",\n\t\t\"predecessors\": [");
-                       int inputNum = 0;
-                       
-                       while (inConns.hasNext()) {
-                               final DumpableConnection<?> inConn = 
inConns.next();
-                               final DumpableNode<?> source = 
inConn.getSource();
-                               writer.print(inputNum == 0 ? "\n" : ",\n");
-                               if (inputNum == 0) {
-                                       child1name += child1name.length() > 0 ? 
", " : ""; 
-                                       child1name += 
source.getOptimizerNode().getOperator().getName();
-                               } else if (inputNum == 1) {
-                                       child2name += child2name.length() > 0 ? 
", " : ""; 
-                                       child2name = 
source.getOptimizerNode().getOperator().getName();
-                               }
-
-                               // output predecessor id
-                               writer.print("\t\t\t{\"id\": " + 
this.nodeIds.get(source));
-
-                               // output connection side
-                               if (inConns.hasNext() || inputNum > 0) {
-                                       writer.print(", \"side\": \"" + 
(inputNum == 0 ? "first" : "second") + "\"");
-                               }
-                               // output shipping strategy and channel type
-                               final Channel channel = (inConn instanceof 
Channel) ? (Channel) inConn : null; 
-                               final ShipStrategyType shipType = channel != 
null ? channel.getShipStrategy() :
-                                               ((DagConnection) 
inConn).getShipStrategy();
-                                       
-                               String shipStrategy = null;
-                               if (shipType != null) {
-                                       switch (shipType) {
-                                       case NONE:
-                                               // nothing
-                                               break;
-                                       case FORWARD:
-                                               shipStrategy = "Forward";
-                                               break;
-                                       case BROADCAST:
-                                               shipStrategy = "Broadcast";
-                                               break;
-                                       case PARTITION_HASH:
-                                               shipStrategy = "Hash Partition";
-                                               break;
-                                       case PARTITION_RANGE:
-                                               shipStrategy = "Range 
Partition";
-                                               break;
-                                       case PARTITION_RANDOM:
-                                               shipStrategy = "Redistribute";
-                                               break;
-                                       case PARTITION_FORCED_REBALANCE:
-                                               shipStrategy = "Rebalance";
-                                               break;
-                                       case PARTITION_CUSTOM:
-                                               shipStrategy = "Custom 
Partition";
-                                               break;
-                                       default:
-                                               throw new 
CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name()
-                                                       + "' in JSON 
generator.");
-                                       }
-                               }
-                               
-                               if (channel != null && 
channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 
0) {
-                                       shipStrategy += " on " + 
(channel.getShipStrategySortOrder() == null ?
-                                                       
channel.getShipStrategyKeys().toString() :
-                                                       
Utils.createOrdering(channel.getShipStrategyKeys(), 
channel.getShipStrategySortOrder()).toString());
-                               }
-
-                               if (shipStrategy != null) {
-                                       writer.print(", \"ship_strategy\": \"" 
+ shipStrategy + "\"");
-                               }
-                               
-                               if (channel != null) {
-                                       String localStrategy = null;
-                                       switch (channel.getLocalStrategy()) {
-                                       case NONE:
-                                               break;
-                                       case SORT:
-                                               localStrategy = "Sort";
-                                               break;
-                                       case COMBININGSORT:
-                                               localStrategy = "Sort 
(combining)";
-                                               break;
-                                       default:
-                                               throw new 
CompilerException("Unknown local strategy " + 
channel.getLocalStrategy().name());
-                                       }
-                                       
-                                       if (channel != null && 
channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() 
> 0) {
-                                               localStrategy += " on " + 
(channel.getLocalStrategySortOrder() == null ?
-                                                               
channel.getLocalStrategyKeys().toString() :
-                                                               
Utils.createOrdering(channel.getLocalStrategyKeys(), 
channel.getLocalStrategySortOrder()).toString());
-                                       }
-                                       
-                                       if (localStrategy != null) {
-                                               writer.print(", 
\"local_strategy\": \"" + localStrategy + "\"");
-                                       }
-                                       
-                                       if (channel != null && 
channel.getTempMode() != TempMode.NONE) {
-                                               String tempMode = 
channel.getTempMode().toString();
-                                               writer.print(", \"temp_mode\": 
\"" + tempMode + "\"");
-                                       }
-                               }
-                               
-                               writer.print('}');
-                               inputNum++;
-                       }
-                       // finish predecessors
-                       writer.print("\n\t\t]");
-               }
-               
-               
//---------------------------------------------------------------------------------------
-               // the part below here is relevant only to plan nodes with 
concrete strategies, etc
-               
//---------------------------------------------------------------------------------------
-
-               final PlanNode p = node.getPlanNode();
-               if (p == null) {
-                       // finish node
-                       writer.print("\n\t}");
-                       return true;
-               }
-               // local strategy
-               String locString = null;
-               if (p.getDriverStrategy() != null) {
-                       switch (p.getDriverStrategy()) {
-                       case NONE:
-                       case BINARY_NO_OP:
-                               break;
-                               
-                       case UNARY_NO_OP:
-                               locString = "No-Op";
-                               break;
-                               
-                       case COLLECTOR_MAP:
-                       case MAP:
-                               locString = "Map";
-                               break;
-                               
-                       case FLAT_MAP:
-                               locString = "FlatMap";
-                               break;
-                               
-                       case MAP_PARTITION:
-                               locString = "Map Partition";
-                               break;
-                       
-                       case ALL_REDUCE:
-                               locString = "Reduce All";
-                               break;
-                       
-                       case ALL_GROUP_REDUCE:
-                       case ALL_GROUP_REDUCE_COMBINE:
-                               locString = "Group Reduce All";
-                               break;
-                               
-                       case SORTED_REDUCE:
-                               locString = "Sorted Reduce";
-                               break;
-                               
-                       case SORTED_PARTIAL_REDUCE:
-                               locString = "Sorted Combine/Reduce";
-                               break;
-
-                       case SORTED_GROUP_REDUCE:
-                               locString = "Sorted Group Reduce";
-                               break;
-                               
-                       case SORTED_GROUP_COMBINE:
-                               locString = "Sorted Combine";
-                               break;
-
-                       case HYBRIDHASH_BUILD_FIRST:
-                               locString = "Hybrid Hash (build: " + child1name 
+ ")";
-                               break;
-                       case HYBRIDHASH_BUILD_SECOND:
-                               locString = "Hybrid Hash (build: " + child2name 
+ ")";
-                               break;
-                               
-                       case HYBRIDHASH_BUILD_FIRST_CACHED:
-                               locString = "Hybrid Hash (CACHED) (build: " + 
child1name + ")";
-                               break;
-                       case HYBRIDHASH_BUILD_SECOND_CACHED:
-                               locString = "Hybrid Hash (CACHED) (build: " + 
child2name + ")";
-                               break;
-
-                       case NESTEDLOOP_BLOCKED_OUTER_FIRST:
-                               locString = "Nested Loops (Blocked Outer: " + 
child1name + ")";
-                               break;
-                       case NESTEDLOOP_BLOCKED_OUTER_SECOND:
-                               locString = "Nested Loops (Blocked Outer: " + 
child2name + ")";
-                               break;
-                       case NESTEDLOOP_STREAMED_OUTER_FIRST:
-                               locString = "Nested Loops (Streamed Outer: " + 
child1name + ")";
-                               break;
-                       case NESTEDLOOP_STREAMED_OUTER_SECOND:
-                               locString = "Nested Loops (Streamed Outer: " + 
child2name + ")";
-                               break;
-
-                       case MERGE:
-                               locString = "Merge";
-                               break;
-
-                       case CO_GROUP:
-                               locString = "Co-Group";
-                               break;
-
-                       default:
-                               locString = p.getDriverStrategy().name();
-                               break;
-                       }
-
-                       if (locString != null) {
-                               writer.print(",\n\t\t\"driver_strategy\": \"");
-                               writer.print(locString);
-                               writer.print("\"");
-                       }
-               }
-               
-               {
-                       // output node global properties
-                       final GlobalProperties gp = p.getGlobalProperties();
-
-                       writer.print(",\n\t\t\"global_properties\": [\n");
-
-                       addProperty(writer, "Partitioning", 
gp.getPartitioning().name(), true);
-                       if (gp.getPartitioningFields() != null) {
-                               addProperty(writer, "Partitioned on", 
gp.getPartitioningFields().toString(), false);
-                       }
-                       if (gp.getPartitioningOrdering() != null) {
-                               addProperty(writer, "Partitioning Order", 
gp.getPartitioningOrdering().toString(), false);      
-                       }
-                       else {
-                               addProperty(writer, "Partitioning Order", 
"(none)", false);
-                       }
-                       if (n.getUniqueFields() == null || 
n.getUniqueFields().size() == 0) {
-                               addProperty(writer, "Uniqueness", "not unique", 
false);
-                       }
-                       else {
-                               addProperty(writer, "Uniqueness", 
n.getUniqueFields().toString(), false);       
-                       }
-
-                       writer.print("\n\t\t]");
-               }
-
-               {
-                       // output node local properties
-                       LocalProperties lp = p.getLocalProperties();
-
-                       writer.print(",\n\t\t\"local_properties\": [\n");
-
-                       if (lp.getOrdering() != null) {
-                               addProperty(writer, "Order", 
lp.getOrdering().toString(), true);        
-                       }
-                       else {
-                               addProperty(writer, "Order", "(none)", true);
-                       }
-                       if (lp.getGroupedFields() != null && 
lp.getGroupedFields().size() > 0) {
-                               addProperty(writer, "Grouped on", 
lp.getGroupedFields().toString(), false);
-                       } else {
-                               addProperty(writer, "Grouping", "not grouped", 
false);  
-                       }
-                       if (n.getUniqueFields() == null || 
n.getUniqueFields().size() == 0) {
-                               addProperty(writer, "Uniqueness", "not unique", 
false);
-                       }
-                       else {
-                               addProperty(writer, "Uniqueness", 
n.getUniqueFields().toString(), false);       
-                       }
-
-                       writer.print("\n\t\t]");
-               }
-
-               // output node size estimates
-               writer.print(",\n\t\t\"estimates\": [\n");
-
-               addProperty(writer, "Est. Output Size", 
n.getEstimatedOutputSize() == -1 ? "(unknown)"
-                       : formatNumber(n.getEstimatedOutputSize(), "B"), true);
-               addProperty(writer, "Est. Cardinality", 
n.getEstimatedNumRecords() == -1 ? "(unknown)"
-                       : formatNumber(n.getEstimatedNumRecords()), false);
-
-               writer.print("\t\t]");
-
-               // output node cost
-               if (p.getNodeCosts() != null) {
-                       writer.print(",\n\t\t\"costs\": [\n");
-
-                       addProperty(writer, "Network", 
p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)"
-                               : 
formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true);
-                       addProperty(writer, "Disk I/O", 
p.getNodeCosts().getDiskCost() == -1 ? "(unknown)"
-                               : formatNumber(p.getNodeCosts().getDiskCost(), 
"B"), false);
-                       addProperty(writer, "CPU", 
p.getNodeCosts().getCpuCost() == -1 ? "(unknown)"
-                               : formatNumber(p.getNodeCosts().getCpuCost(), 
""), false);
-
-                       addProperty(writer, "Cumulative Network",
-                               p.getCumulativeCosts().getNetworkCost() == -1 ? 
"(unknown)" : formatNumber(p
-                                       .getCumulativeCosts().getNetworkCost(), 
"B"), false);
-                       addProperty(writer, "Cumulative Disk I/O",
-                               p.getCumulativeCosts().getDiskCost() == -1 ? 
"(unknown)" : formatNumber(p
-                                       .getCumulativeCosts().getDiskCost(), 
"B"), false);
-                       addProperty(writer, "Cumulative CPU",
-                               p.getCumulativeCosts().getCpuCost() == -1 ? 
"(unknown)" : formatNumber(p
-                                       .getCumulativeCosts().getCpuCost(), 
""), false);
-
-                       writer.print("\n\t\t]");
-               }
-
-               // output the node compiler hints
-               if (n.getOperator().getCompilerHints() != null) {
-                       CompilerHints hints = 
n.getOperator().getCompilerHints();
-                       CompilerHints defaults = new CompilerHints();
-
-                       String size = hints.getOutputSize() == 
defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());
-                       String card = hints.getOutputCardinality() == 
defaults.getOutputCardinality() ? "(none)" : 
String.valueOf(hints.getOutputCardinality());
-                       String width = hints.getAvgOutputRecordSize() == 
defaults.getAvgOutputRecordSize() ? "(none)" : 
String.valueOf(hints.getAvgOutputRecordSize());
-                       String filter = hints.getFilterFactor() == 
defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor());
-                       
-                       writer.print(",\n\t\t\"compiler_hints\": [\n");
-
-                       addProperty(writer, "Output Size (bytes)", size, true);
-                       addProperty(writer, "Output Cardinality", card, false);
-                       addProperty(writer, "Avg. Output Record Size (bytes)", 
width, false);
-                       addProperty(writer, "Filter Factor", filter, false);
-
-                       writer.print("\t\t]");
-               }
-
-               // finish node
-               writer.print("\n\t}");
-               return true;
-       }
-
-       private void addProperty(PrintWriter writer, String name, String value, 
boolean first) {
-               if (!first) {
-                       writer.print(",\n");
-               }
-               writer.print("\t\t\t{ \"name\": \"");
-               writer.print(name);
-               writer.print("\", \"value\": \"");
-               writer.print(value);
-               writer.print("\" }");
-       }
-
-       public static final String formatNumber(double number) {
-               return formatNumber(number, "");
-       }
-
-       public static final String formatNumber(double number, String suffix) {
-               if (number <= 0.0) {
-                       return String.valueOf(number);
-               }
-
-               int power = (int) Math.ceil(Math.log10(number));
-
-               int group = (power - 1) / 3;
-               if (group >= SIZE_SUFFIXES.length) {
-                       group = SIZE_SUFFIXES.length - 1;
-               } else if (group < 0) {
-                       group = 0;
-               }
-
-               // truncate fractional part
-               int beforeDecimal = power - group * 3;
-               if (power > beforeDecimal) {
-                       for (int i = power - beforeDecimal; i > 0; i--) {
-                               number /= 10;
-                       }
-               }
-               
-               return group > 0 ? String.format(Locale.US, "%.2f %s", number, 
SIZE_SUFFIXES[group]) :
-                       String.format(Locale.US, "%.2f", number);
-       }
-
-       private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' };
-}

Reply via email to