http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
deleted file mode 100644
index 0d1dfc9..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-/**
- * Enumeration to indicate the mode of temporarily materializing the data that 
flows across a connection.
- * Introducing such an artificial dam is sometimes necessary to avoid that a 
certain data flows deadlock
- * themselves, or as a cache to replay an intermediate result.
- */
-public enum TempMode {
-       
-       NONE(false, false),
-       PIPELINE_BREAKER(false, true),
-       CACHED(true, false),
-       CACHING_PIPELINE_BREAKER(true, true);
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private final boolean cached;
-       
-       private final boolean breaksPipeline;
-       
-       
-       private TempMode(boolean cached, boolean breaksPipeline) {
-               this.cached = cached;
-               this.breaksPipeline = breaksPipeline;
-       }
-
-       public boolean isCached() {
-               return cached;
-       }
-
-       public boolean breaksPipeline() {
-               return breaksPipeline;
-       }
-       
-       public TempMode makePipelineBreaker() {
-               if (this == NONE) {
-                       return PIPELINE_BREAKER;
-               } else if (this == CACHED) {
-                       return CACHING_PIPELINE_BREAKER;
-               } else {
-                       return this;
-               }
-       }
-       
-       public TempMode makeCached() {
-               if (this == NONE) {
-                       return CACHED;
-               } else if (this == PIPELINE_BREAKER) {
-                       return CACHING_PIPELINE_BREAKER;
-               } else {
-                       return this;
-               }
-       }
-       
-       
-       public TempMode makeNonCached() {
-               if (this == CACHED) {
-                       return NONE;
-               } else if (this == CACHING_PIPELINE_BREAKER) {
-                       return PIPELINE_BREAKER;
-               } else {
-                       return this;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
deleted file mode 100644
index 39da165..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ /dev/null
@@ -1,747 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import 
org.apache.flink.optimizer.operators.OperatorDescriptorDual.GlobalPropertiesPair;
-import 
org.apache.flink.optimizer.operators.OperatorDescriptorDual.LocalPropertiesPair;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-import com.google.common.collect.Sets;
-
-/**
- * A node in the optimizer plan that represents a PACT with a two different 
inputs, such as MATCH or CROSS.
- * The two inputs are not substitutable in their sides.
- */
-public abstract class TwoInputNode extends OptimizerNode {
-       
-       protected final FieldList keys1; // The set of key fields for the first 
input
-       
-       protected final FieldList keys2; // The set of key fields for the 
second input
-       
-       protected DagConnection input1; // The first input edge
-
-       protected DagConnection input2; // The second input edge
-       
-       private List<OperatorDescriptorDual> cachedDescriptors;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Creates a new node with a single input for the optimizer plan.
-        * 
-        * @param pactContract
-        *        The PACT that the node represents.
-        */
-       public TwoInputNode(DualInputOperator<?, ?, ?, ?> pactContract) {
-               super(pactContract);
-
-               int[] k1 = pactContract.getKeyColumns(0);
-               int[] k2 = pactContract.getKeyColumns(1);
-               
-               this.keys1 = k1 == null || k1.length == 0 ? null : new 
FieldList(k1);
-               this.keys2 = k2 == null || k2.length == 0 ? null : new 
FieldList(k2);
-               
-               if (this.keys1 != null) {
-                       if (this.keys2 != null) {
-                               if (this.keys1.size() != this.keys2.size()) {
-                                       throw new CompilerException("Unequal 
number of key fields on the two inputs.");
-                               }
-                       } else {
-                               throw new CompilerException("Keys are set on 
first input, but not on second.");
-                       }
-               } else if (this.keys2 != null) {
-                       throw new CompilerException("Keys are set on second 
input, but not on first.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public DualInputOperator<?, ?, ?, ?> getOperator() {
-               return (DualInputOperator<?, ?, ?, ?>) super.getOperator();
-       }
-
-       /**
-        * Gets the <tt>PactConnection</tt> through which this node receives 
its <i>first</i> input.
-        * 
-        * @return The first input connection.
-        */
-       public DagConnection getFirstIncomingConnection() {
-               return this.input1;
-       }
-
-       /**
-        * Gets the <tt>PactConnection</tt> through which this node receives 
its <i>second</i> input.
-        * 
-        * @return The second input connection.
-        */
-       public DagConnection getSecondIncomingConnection() {
-               return this.input2;
-       }
-       
-       public OptimizerNode getFirstPredecessorNode() {
-               if(this.input1 != null) {
-                       return this.input1.getSource();
-               } else {
-                       return null;
-               }
-       }
-
-       public OptimizerNode getSecondPredecessorNode() {
-               if(this.input2 != null) {
-                       return this.input2.getSource();
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public List<DagConnection> getIncomingConnections() {
-               ArrayList<DagConnection> inputs = new 
ArrayList<DagConnection>(2);
-               inputs.add(input1);
-               inputs.add(input2);
-               return inputs;
-       }
-
-
-       @Override
-       public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultExecutionMode) {
-               // see if there is a hint that dictates which shipping strategy 
to use for BOTH inputs
-               final Configuration conf = getOperator().getParameters();
-               ShipStrategyType preSet1 = null;
-               ShipStrategyType preSet2 = null;
-               
-               String shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
-               if (shipStrategy != null) {
-                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
-                               preSet1 = preSet2 = ShipStrategyType.FORWARD;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
-                               preSet1 = preSet2 = ShipStrategyType.BROADCAST;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
-                               preSet1 = preSet2 = 
ShipStrategyType.PARTITION_HASH;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
-                               preSet1 = preSet2 = 
ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-                               preSet1 = preSet2 = 
ShipStrategyType.PARTITION_RANDOM;
-                       } else {
-                               throw new CompilerException("Unknown hint for 
shipping strategy: " + shipStrategy);
-                       }
-               }
-
-               // see if there is a hint that dictates which shipping strategy 
to use for the FIRST input
-               shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
-               if (shipStrategy != null) {
-                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
-                               preSet1 = ShipStrategyType.FORWARD;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
-                               preSet1 = ShipStrategyType.BROADCAST;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
-                               preSet1 = ShipStrategyType.PARTITION_HASH;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
-                               preSet1 = ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-                               preSet1 = ShipStrategyType.PARTITION_RANDOM;
-                       } else {
-                               throw new CompilerException("Unknown hint for 
shipping strategy of input one: " + shipStrategy);
-                       }
-               }
-
-               // see if there is a hint that dictates which shipping strategy 
to use for the SECOND input
-               shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
-               if (shipStrategy != null) {
-                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
-                               preSet2 = ShipStrategyType.FORWARD;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
-                               preSet2 = ShipStrategyType.BROADCAST;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
-                               preSet2 = ShipStrategyType.PARTITION_HASH;
-                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
-                               preSet2 = ShipStrategyType.PARTITION_RANGE;
-                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
-                               preSet2 = ShipStrategyType.PARTITION_RANDOM;
-                       } else {
-                               throw new CompilerException("Unknown hint for 
shipping strategy of input two: " + shipStrategy);
-                       }
-               }
-               
-               // get the predecessors
-               DualInputOperator<?, ?, ?, ?> contr = getOperator();
-               
-               Operator<?> leftPred = contr.getFirstInput();
-               Operator<?> rightPred = contr.getSecondInput();
-               
-               OptimizerNode pred1;
-               DagConnection conn1;
-               if (leftPred == null) {
-                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input set for first input.");
-               } else {
-                       pred1 = contractToNode.get(leftPred);
-                       conn1 = new DagConnection(pred1, this, 
defaultExecutionMode);
-                       if (preSet1 != null) {
-                               conn1.setShipStrategy(preSet1);
-                       }
-               } 
-               
-               // create the connection and add it
-               this.input1 = conn1;
-               pred1.addOutgoingConnection(conn1);
-               
-               OptimizerNode pred2;
-               DagConnection conn2;
-               if (rightPred == null) {
-                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input set for second input.");
-               } else {
-                       pred2 = contractToNode.get(rightPred);
-                       conn2 = new DagConnection(pred2, this, 
defaultExecutionMode);
-                       if (preSet2 != null) {
-                               conn2.setShipStrategy(preSet2);
-                       }
-               }
-               
-               // create the connection and add it
-               this.input2 = conn2;
-               pred2.addOutgoingConnection(conn2);
-       }
-       
-       protected abstract List<OperatorDescriptorDual> getPossibleProperties();
-
-       private List<OperatorDescriptorDual> getProperties() {
-               if (this.cachedDescriptors == null) {
-                       this.cachedDescriptors = getPossibleProperties();
-               }
-               return this.cachedDescriptors;
-       }
-       
-       @Override
-       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
-               // get what we inherit and what is preserved by our user code 
-               final InterestingProperties props1 = 
getInterestingProperties().filterByCodeAnnotations(this, 0);
-               final InterestingProperties props2 = 
getInterestingProperties().filterByCodeAnnotations(this, 1);
-               
-               // add all properties relevant to this node
-               for (OperatorDescriptorDual dpd : getProperties()) {
-                       for (GlobalPropertiesPair gp : 
dpd.getPossibleGlobalProperties()) {
-                               // input 1
-                               props1.addGlobalProperties(gp.getProperties1());
-                               
-                               // input 2
-                               props2.addGlobalProperties(gp.getProperties2());
-                       }
-                       for (LocalPropertiesPair lp : 
dpd.getPossibleLocalProperties()) {
-                               // input 1
-                               props1.addLocalProperties(lp.getProperties1());
-                               
-                               // input 2
-                               props2.addLocalProperties(lp.getProperties2());
-                       }
-               }
-               this.input1.setInterestingProperties(props1);
-               this.input2.setInterestingProperties(props2);
-               
-               for (DagConnection conn : getBroadcastConnections()) {
-                       conn.setInterestingProperties(new 
InterestingProperties());
-               }
-       }
-
-       @Override
-       public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-               // check if we have a cached version
-               if (this.cachedPlans != null) {
-                       return this.cachedPlans;
-               }
-
-               boolean childrenSkippedDueToReplicatedInput = false;
-
-               // step down to all producer nodes and calculate alternative 
plans
-               final List<? extends PlanNode> subPlans1 = 
getFirstPredecessorNode().getAlternativePlans(estimator);
-               final List<? extends PlanNode> subPlans2 = 
getSecondPredecessorNode().getAlternativePlans(estimator);
-
-               // calculate alternative sub-plans for predecessor
-               final Set<RequestedGlobalProperties> intGlobal1 = 
this.input1.getInterestingProperties().getGlobalProperties();
-               final Set<RequestedGlobalProperties> intGlobal2 = 
this.input2.getInterestingProperties().getGlobalProperties();
-               
-               // calculate alternative sub-plans for broadcast inputs
-               final List<Set<? extends NamedChannel>> broadcastPlanChannels = 
new ArrayList<Set<? extends NamedChannel>>();
-               List<DagConnection> broadcastConnections = 
getBroadcastConnections();
-               List<String> broadcastConnectionNames = 
getBroadcastConnectionNames();
-
-               for (int i = 0; i < broadcastConnections.size(); i++ ) {
-                       DagConnection broadcastConnection = 
broadcastConnections.get(i);
-                       String broadcastConnectionName = 
broadcastConnectionNames.get(i);
-                       List<PlanNode> broadcastPlanCandidates = 
broadcastConnection.getSource().getAlternativePlans(estimator);
-
-                       // wrap the plan candidates in named channels
-                       HashSet<NamedChannel> broadcastChannels = new 
HashSet<NamedChannel>(broadcastPlanCandidates.size());
-                       for (PlanNode plan: broadcastPlanCandidates) {
-                               final NamedChannel c = new 
NamedChannel(broadcastConnectionName, plan);
-                               DataExchangeMode exMode = 
DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
-                                                                               
        ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
-                               c.setShipStrategy(ShipStrategyType.BROADCAST, 
exMode);
-                               broadcastChannels.add(c);
-                       }
-                       broadcastPlanChannels.add(broadcastChannels);
-               }
-               
-               final GlobalPropertiesPair[] allGlobalPairs;
-               final LocalPropertiesPair[] allLocalPairs;
-               {
-                       Set<GlobalPropertiesPair> pairsGlob = new 
HashSet<GlobalPropertiesPair>();
-                       Set<LocalPropertiesPair> pairsLoc = new 
HashSet<LocalPropertiesPair>();
-                       for (OperatorDescriptorDual ods : getProperties()) {
-                               
pairsGlob.addAll(ods.getPossibleGlobalProperties());
-                               
pairsLoc.addAll(ods.getPossibleLocalProperties());
-                       }
-                       allGlobalPairs = pairsGlob.toArray(new 
GlobalPropertiesPair[pairsGlob.size()]);
-                       allLocalPairs = pairsLoc.toArray(new 
LocalPropertiesPair[pairsLoc.size()]);
-               }
-               
-               final ArrayList<PlanNode> outputPlans = new 
ArrayList<PlanNode>();
-
-               final ExecutionMode input1Mode = 
this.input1.getDataExchangeMode();
-               final ExecutionMode input2Mode = 
this.input2.getDataExchangeMode();
-
-               final int dop = getParallelism();
-               final int inDop1 = getFirstPredecessorNode().getParallelism();
-               final int inDop2 = getSecondPredecessorNode().getParallelism();
-
-               final boolean dopChange1 = dop != inDop1;
-               final boolean dopChange2 = dop != inDop2;
-
-               final boolean input1breaksPipeline = 
this.input1.isBreakingPipeline();
-               final boolean input2breaksPipeline = 
this.input2.isBreakingPipeline();
-
-               // enumerate all pairwise combination of the children's plans 
together with
-               // all possible operator strategy combination
-               
-               // create all candidates
-               for (PlanNode child1 : subPlans1) {
-
-                       if (child1.getGlobalProperties().isFullyReplicated()) {
-                               // fully replicated input is always locally 
forwarded if DOP is not changed
-                               if (dopChange1) {
-                                       // can not continue with this child
-                                       childrenSkippedDueToReplicatedInput = 
true;
-                                       continue;
-                               } else {
-                                       
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
-                               }
-                       }
-
-                       for (PlanNode child2 : subPlans2) {
-
-                               if 
(child2.getGlobalProperties().isFullyReplicated()) {
-                                       // fully replicated input is always 
locally forwarded if DOP is not changed
-                                       if (dopChange2) {
-                                               // can not continue with this 
child
-                                               
childrenSkippedDueToReplicatedInput = true;
-                                               continue;
-                                       } else {
-                                               
this.input2.setShipStrategy(ShipStrategyType.FORWARD);
-                                       }
-                               }
-                               
-                               // check that the children go together. that is 
the case if they build upon the same
-                               // candidate at the joined branch plan. 
-                               if (!areBranchCompatible(child1, child2)) {
-                                       continue;
-                               }
-                               
-                               for (RequestedGlobalProperties igps1: 
intGlobal1) {
-                                       // create a candidate channel for the 
first input. mark it cached, if the connection says so
-                                       final Channel c1 = new Channel(child1, 
this.input1.getMaterializationMode());
-                                       if (this.input1.getShipStrategy() == 
null) {
-                                               // free to choose the ship 
strategy
-                                               igps1.parameterizeChannel(c1, 
dopChange1, input1Mode, input1breaksPipeline);
-                                               
-                                               // if the DOP changed, make 
sure that we cancel out properties, unless the
-                                               // ship strategy 
preserves/establishes them even under changing DOPs
-                                               if (dopChange1 && 
!c1.getShipStrategy().isNetworkStrategy()) {
-                                                       
c1.getGlobalProperties().reset();
-                                               }
-                                       }
-                                       else {
-                                               // ship strategy fixed by 
compiler hint
-                                               ShipStrategyType shipType = 
this.input1.getShipStrategy();
-                                               DataExchangeMode exMode = 
DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline);
-                                               if (this.keys1 != null) {
-                                                       
c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode);
-                                               }
-                                               else {
-                                                       
c1.setShipStrategy(shipType, exMode);
-                                               }
-                                               
-                                               if (dopChange1) {
-                                                       
c1.adjustGlobalPropertiesForFullParallelismChange();
-                                               }
-                                       }
-                                       
-                                       for (RequestedGlobalProperties igps2: 
intGlobal2) {
-                                               // create a candidate channel 
for the first input. mark it cached, if the connection says so
-                                               final Channel c2 = new 
Channel(child2, this.input2.getMaterializationMode());
-                                               if 
(this.input2.getShipStrategy() == null) {
-                                                       // free to choose the 
ship strategy
-                                                       
igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
-                                                       
-                                                       // if the DOP changed, 
make sure that we cancel out properties, unless the
-                                                       // ship strategy 
preserves/establishes them even under changing DOPs
-                                                       if (dopChange2 && 
!c2.getShipStrategy().isNetworkStrategy()) {
-                                                               
c2.getGlobalProperties().reset();
-                                                       }
-                                               } else {
-                                                       // ship strategy fixed 
by compiler hint
-                                                       ShipStrategyType 
shipType = this.input2.getShipStrategy();
-                                                       DataExchangeMode exMode 
= DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline);
-                                                       if (this.keys2 != null) 
{
-                                                               
c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode);
-                                                       } else {
-                                                               
c2.setShipStrategy(shipType, exMode);
-                                                       }
-                                                       
-                                                       if (dopChange2) {
-                                                               
c2.adjustGlobalPropertiesForFullParallelismChange();
-                                                       }
-                                               }
-                                               
-                                               /* 
********************************************************************
-                                                * NOTE: Depending on how we 
proceed with different partitioning,
-                                                *       we might at some point 
need a compatibility check between
-                                                *       the pairs of global 
properties.
-                                                * 
*******************************************************************/
-                                               
-                                               outer:
-                                               for (GlobalPropertiesPair gpp : 
allGlobalPairs) {
-                                                       if 
(gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
-                                                               
gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
-                                                       {
-                                                               for 
(OperatorDescriptorDual desc : getProperties()) {
-                                                                       if 
(desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), 
-                                                                               
        c1.getGlobalProperties(), c2.getGlobalProperties()))
-                                                                       {
-                                                                               
Channel c1Clone = c1.clone();
-                                                                               
c1Clone.setRequiredGlobalProps(gpp.getProperties1());
-                                                                               
c2.setRequiredGlobalProps(gpp.getProperties2());
-                                                                               
-                                                                               
// we form a valid combination, so create the local candidates
-                                                                               
// for this
-                                                                               
addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2,
-                                                                               
                                                                        
outputPlans, allLocalPairs, estimator);
-                                                                               
break outer;
-                                                                       }
-                                                               }
-                                                       }
-                                               }
-                                               
-                                               // break the loop over input2's 
possible global properties, if the property
-                                               // is fixed via a hint. All the 
properties are overridden by the hint anyways,
-                                               // so we can stop after the 
first
-                                               if 
(this.input2.getShipStrategy() != null) {
-                                                       break;
-                                               }
-                                       }
-                                       
-                                       // break the loop over input1's 
possible global properties, if the property
-                                       // is fixed via a hint. All the 
properties are overridden by the hint anyways,
-                                       // so we can stop after the first
-                                       if (this.input1.getShipStrategy() != 
null) {
-                                               break;
-                                       }
-                               }
-                       }
-               }
-
-               if(outputPlans.isEmpty()) {
-                       if(childrenSkippedDueToReplicatedInput) {
-                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
-                                                                               
        + ". Most likely reason: Invalid use of replicated input.");
-                       } else {
-                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
-                                                                               
        + ". Most likely reason: Too restrictive plan hints.");
-                       }
-               }
-
-               // cost and prune the plans
-               for (PlanNode node : outputPlans) {
-                       estimator.costOperator(node);
-               }
-               prunePlanAlternatives(outputPlans);
-               outputPlans.trimToSize();
-
-               this.cachedPlans = outputPlans;
-               return outputPlans;
-       }
-       
-       protected void addLocalCandidates(Channel template1, Channel template2, 
List<Set<? extends NamedChannel>> broadcastPlanChannels, 
-                       RequestedGlobalProperties rgps1, 
RequestedGlobalProperties rgps2,
-                       List<PlanNode> target, LocalPropertiesPair[] 
validLocalCombinations, CostEstimator estimator)
-       {
-               for (RequestedLocalProperties ilp1 : 
this.input1.getInterestingProperties().getLocalProperties()) {
-                       final Channel in1 = template1.clone();
-                       ilp1.parameterizeChannel(in1);
-                       
-                       for (RequestedLocalProperties ilp2 : 
this.input2.getInterestingProperties().getLocalProperties()) {
-                               final Channel in2 = template2.clone();
-                               ilp2.parameterizeChannel(in2);
-                               
-                               for (OperatorDescriptorDual dps: 
getProperties()) {
-                                       for (LocalPropertiesPair lpp : 
dps.getPossibleLocalProperties()) {
-                                               if 
(lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
-                                                       
lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
-                                               {
-                                                       // valid combination
-                                                       // for non trivial 
local properties, we need to check that they are co compatible
-                                                       // (such as when some 
sort order is requested, that both are the same sort order
-                                                       if 
(dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
-                                                               
in1.getLocalProperties(), in2.getLocalProperties()))
-                                                       {
-                                                               // copy, 
because setting required properties and instantiation may
-                                                               // change the 
channels and should not affect prior candidates
-                                                               Channel in1Copy 
= in1.clone();
-                                                               
in1Copy.setRequiredLocalProps(lpp.getProperties1());
-                                                               
-                                                               Channel in2Copy 
= in2.clone();
-                                                               
in2Copy.setRequiredLocalProps(lpp.getProperties2());
-                                                               
-                                                               // all right, 
co compatible
-                                                               
instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, 
rgps1, rgps2, ilp1, ilp2);
-                                                               break;
-                                                       }
-                                                       // else cannot use this 
pair, fall through the loop and try the next one
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       protected void instantiate(OperatorDescriptorDual operator, Channel 
in1, Channel in2,
-                       List<Set<? extends NamedChannel>> 
broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
-                       RequestedGlobalProperties globPropsReq1, 
RequestedGlobalProperties globPropsReq2,
-                       RequestedLocalProperties locPropsReq1, 
RequestedLocalProperties locPropsReq2)
-       {
-               final PlanNode inputSource1 = in1.getSource();
-               final PlanNode inputSource2 = in2.getSource();
-               
-               for (List<NamedChannel> broadcastChannelsCombination: 
Sets.cartesianProduct(broadcastPlanChannels)) {
-                       
-                       boolean validCombination = true;
-                       
-                       // check whether the broadcast inputs use the same plan 
candidate at the branching point
-                       for (int i = 0; i < 
broadcastChannelsCombination.size(); i++) {
-                               NamedChannel nc = 
broadcastChannelsCombination.get(i);
-                               PlanNode bcSource = nc.getSource();
-                               
-                               if (!(areBranchCompatible(bcSource, 
inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
-                                       validCombination = false;
-                                       break;
-                               }
-                               
-                               // check branch compatibility against all other 
broadcast variables
-                               for (int k = 0; k < i; k++) {
-                                       PlanNode otherBcSource = 
broadcastChannelsCombination.get(k).getSource();
-                                       
-                                       if (!areBranchCompatible(bcSource, 
otherBcSource)) {
-                                               validCombination = false;
-                                               break;
-                                       }
-                               }
-                       }
-                       
-                       if (!validCombination) {
-                               continue;
-                       }
-                       
-                       
placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
-                       
-                       DualInputPlanNode node = operator.instantiate(in1, in2, 
this);
-                       node.setBroadcastInputs(broadcastChannelsCombination);
-
-                       SemanticProperties props = this.getSemanticProperties();
-                       GlobalProperties gp1 = 
in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
-                       GlobalProperties gp2 = 
in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
-                       GlobalProperties combined = 
operator.computeGlobalProperties(gp1, gp2);
-
-                       LocalProperties lp1 = 
in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
-                       LocalProperties lp2 = 
in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
-                       LocalProperties locals = 
operator.computeLocalProperties(lp1, lp2);
-                       
-                       node.initProperties(combined, locals);
-                       node.updatePropertiesWithUniqueSets(getUniqueFields());
-                       target.add(node);
-               }
-       }
-       
-       protected void placePipelineBreakersIfNecessary(DriverStrategy 
strategy, Channel in1, Channel in2) {
-               // before we instantiate, check for deadlocks by tracing back 
to the open branches and checking
-               // whether either no input, or all of them have a dam
-               if (this.hereJoinedBranches != null && 
this.hereJoinedBranches.size() > 0) {
-                       boolean someDamOnLeftPaths = false;
-                       boolean damOnAllLeftPaths = true;
-                       boolean someDamOnRightPaths = false;
-                       boolean damOnAllRightPaths = true;
-                       
-                       if (strategy.firstDam() == DamBehavior.FULL_DAM || 
in1.getLocalStrategy().dams() || in1.getTempMode().breaksPipeline()) {
-                               someDamOnLeftPaths = true;
-                       } else {
-                               for (OptimizerNode brancher : 
this.hereJoinedBranches) {
-                                       PlanNode candAtBrancher = 
in1.getSource().getCandidateAtBranchPoint(brancher);
-                                       
-                                       // not all candidates are found, 
because this list includes joined branched from both regular inputs and 
broadcast vars
-                                       if (candAtBrancher == null) {
-                                               continue;
-                                       }
-                                       
-                                       SourceAndDamReport res = 
in1.getSource().hasDamOnPathDownTo(candAtBrancher);
-                                       if (res == NOT_FOUND) {
-                                               throw new 
CompilerException("Bug: Tracing dams for deadlock detection is broken.");
-                                       } else if (res == FOUND_SOURCE) {
-                                               damOnAllLeftPaths = false;
-                                       } else if (res == FOUND_SOURCE_AND_DAM) 
{
-                                               someDamOnLeftPaths = true;
-                                       } else {
-                                               throw new CompilerException();
-                                       }
-                               }
-                       }
-                       
-                       if (strategy.secondDam() == DamBehavior.FULL_DAM || 
in2.getLocalStrategy().dams() || in2.getTempMode().breaksPipeline()) {
-                               someDamOnRightPaths = true;
-                       } else {
-                               for (OptimizerNode brancher : 
this.hereJoinedBranches) {
-                                       PlanNode candAtBrancher = 
in2.getSource().getCandidateAtBranchPoint(brancher);
-                                       
-                                       // not all candidates are found, 
because this list includes joined branched from both regular inputs and 
broadcast vars
-                                       if (candAtBrancher == null) {
-                                               continue;
-                                       }
-                                       
-                                       SourceAndDamReport res = 
in2.getSource().hasDamOnPathDownTo(candAtBrancher);
-                                       if (res == NOT_FOUND) {
-                                               throw new 
CompilerException("Bug: Tracing dams for deadlock detection is broken.");
-                                       } else if (res == FOUND_SOURCE) {
-                                               damOnAllRightPaths = false;
-                                       } else if (res == FOUND_SOURCE_AND_DAM) 
{
-                                               someDamOnRightPaths = true;
-                                       } else {
-                                               throw new CompilerException();
-                                       }
-                               }
-                       }
-                       
-                       // okay combinations are both all dam or both no dam
-                       if ( (damOnAllLeftPaths & damOnAllRightPaths) | 
(!someDamOnLeftPaths & !someDamOnRightPaths) ) {
-                               // good, either both materialize already on the 
way, or both fully pipeline
-                       } else {
-                               if (someDamOnLeftPaths & !damOnAllRightPaths) {
-                                       // right needs a pipeline breaker
-                                       
in2.setTempMode(in2.getTempMode().makePipelineBreaker());
-                               }
-                               
-                               if (someDamOnRightPaths & !damOnAllLeftPaths) {
-                                       // right needs a pipeline breaker
-                                       
in1.setTempMode(in1.getTempMode().makePipelineBreaker());
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void computeUnclosedBranchStack() {
-               if (this.openBranches != null) {
-                       return;
-               }
-
-               // handle the data flow branching for the regular inputs
-               
addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
-               
addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-               
-               List<UnclosedBranchDescriptor> result1 = 
getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
-               List<UnclosedBranchDescriptor> result2 = 
getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
-
-               ArrayList<UnclosedBranchDescriptor> inputsMerged = new 
ArrayList<UnclosedBranchDescriptor>();
-               mergeLists(result1, result2, inputsMerged, true);
-               
-               // handle the data flow branching for the broadcast inputs
-               List<UnclosedBranchDescriptor> result = 
computeUnclosedBranchStackForBroadcastInputs(inputsMerged);
-               
-               this.openBranches = (result == null || result.isEmpty()) ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
-       }
-
-       @Override
-       public SemanticProperties getSemanticProperties() {
-               return getOperator().getSemanticProperties();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                     Miscellaneous
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void accept(Visitor<OptimizerNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       if (this.input1 == null || this.input2 == null) {
-                               throw new CompilerException();
-                       }
-                       
-                       getFirstPredecessorNode().accept(visitor);
-                       getSecondPredecessorNode().accept(visitor);
-                       
-                       for (DagConnection connection : 
getBroadcastConnections()) {
-                               connection.getSource().accept(visitor);
-                       }
-                       
-                       visitor.postVisit(this);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
deleted file mode 100644
index 45ecdac..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-
-public class UnaryOperatorNode extends SingleInputNode {
-       
-       private final List<OperatorDescriptorSingle> operator;
-       
-       private final String name;
-
-
-       
-       public UnaryOperatorNode(String name, FieldSet keys, 
OperatorDescriptorSingle ... operators) {
-               this(name, keys, Arrays.asList(operators));
-       }
-       
-       public UnaryOperatorNode(String name, FieldSet keys, 
List<OperatorDescriptorSingle> operators) {
-               super(keys);
-               
-               this.operator = operators;
-               this.name = name;
-       }
-
-       @Override
-       protected List<OperatorDescriptorSingle> getPossibleProperties() {
-               return this.operator;
-       }
-
-       @Override
-       public String getName() {
-               return this.name;
-       }
-
-       @Override
-       public SemanticProperties getSemanticProperties() {
-               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
-       }
-
-       @Override
-       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
-               // we have no estimates by default
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
deleted file mode 100644
index e85f289..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ /dev/null
@@ -1,589 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SolutionSetDeltaOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import 
org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.Nothing;
-import org.apache.flink.util.Visitor;
-
-/**
- * A node in the optimizer's program representation for a workset iteration.
- */
-public class WorksetIterationNode extends TwoInputNode implements 
IterationNode {
-       
-       private static final int DEFAULT_COST_WEIGHT = 20;
-       
-       
-       private final FieldList solutionSetKeyFields;
-       
-       private final GlobalProperties partitionedProperties;
-       
-       private final List<OperatorDescriptorDual> dataProperties;
-       
-       private SolutionSetNode solutionSetNode;
-       
-       private WorksetNode worksetNode;
-       
-       private OptimizerNode solutionSetDelta;
-       
-       private OptimizerNode nextWorkset;
-       
-       private DagConnection solutionSetDeltaRootConnection;
-       
-       private DagConnection nextWorksetRootConnection;
-       
-       private SingleRootJoiner singleRoot;
-       
-       private boolean solutionDeltaImmediatelyAfterSolutionJoin;
-       
-       private final int costWeight;
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Creates a new node with a single input for the optimizer plan.
-        * 
-        * @param iteration The iteration operator that the node represents.
-        */
-       public WorksetIterationNode(DeltaIterationBase<?, ?> iteration) {
-               super(iteration);
-               
-               final int[] ssKeys = iteration.getSolutionSetKeyFields();
-               if (ssKeys == null || ssKeys.length == 0) {
-                       throw new CompilerException("Invalid WorksetIteration: 
No key fields defined for the solution set.");
-               }
-               this.solutionSetKeyFields = new FieldList(ssKeys);
-               this.partitionedProperties = new GlobalProperties();
-               
this.partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
-               
-               int weight = iteration.getMaximumNumberOfIterations() > 0 ? 
-                       iteration.getMaximumNumberOfIterations() : 
DEFAULT_COST_WEIGHT;
-                       
-               if (weight > OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) {
-                       weight = OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT;
-               }
-               this.costWeight = weight; 
-               
-               this.dataProperties = 
Collections.<OperatorDescriptorDual>singletonList(new 
WorksetOpDescriptor(this.solutionSetKeyFields));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public DeltaIterationBase<?, ?> getIterationContract() {
-               return (DeltaIterationBase<?, ?>) getOperator();
-       }
-       
-       public SolutionSetNode getSolutionSetNode() {
-               return this.solutionSetNode;
-       }
-       
-       public WorksetNode getWorksetNode() {
-               return this.worksetNode;
-       }
-       
-       public OptimizerNode getNextWorkset() {
-               return this.nextWorkset;
-       }
-       
-       public OptimizerNode getSolutionSetDelta() {
-               return this.solutionSetDelta;
-       }
-
-       public void setPartialSolution(SolutionSetNode solutionSetNode, 
WorksetNode worksetNode) {
-               if (this.solutionSetNode != null || this.worksetNode != null) {
-                       throw new IllegalStateException("Error: Initializing 
WorksetIterationNode multiple times.");
-               }
-               this.solutionSetNode = solutionSetNode;
-               this.worksetNode = worksetNode;
-       }
-       
-       public void setNextPartialSolution(OptimizerNode solutionSetDelta, 
OptimizerNode nextWorkset,
-                                                                               
ExecutionMode executionMode) {
-
-               // check whether the next partial solution is itself the join 
with
-               // the partial solution (so we can potentially do direct 
updates)
-               if (solutionSetDelta instanceof TwoInputNode) {
-                       TwoInputNode solutionDeltaTwoInput = (TwoInputNode) 
solutionSetDelta;
-                       if (solutionDeltaTwoInput.getFirstPredecessorNode() == 
this.solutionSetNode ||
-                               
solutionDeltaTwoInput.getSecondPredecessorNode() == this.solutionSetNode)
-                       {
-                               this.solutionDeltaImmediatelyAfterSolutionJoin 
= true;
-                       }
-               }
-               
-               // there needs to be at least one node in the workset path, so
-               // if the next workset is equal to the workset, we need to 
inject a no-op node
-               if (nextWorkset == worksetNode || nextWorkset instanceof 
BinaryUnionNode) {
-                       NoOpNode noop = new NoOpNode();
-                       noop.setDegreeOfParallelism(getParallelism());
-
-                       DagConnection noOpConn = new DagConnection(nextWorkset, 
noop, executionMode);
-                       noop.setIncomingConnection(noOpConn);
-                       nextWorkset.addOutgoingConnection(noOpConn);
-                       
-                       nextWorkset = noop;
-               }
-               
-               // attach an extra node to the solution set delta for the cases 
where we need to repartition
-               UnaryOperatorNode solutionSetDeltaUpdateAux = new 
UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
-                               new 
SolutionSetDeltaOperator(getSolutionSetKeyFields()));
-               
solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
-
-               DagConnection conn = new DagConnection(solutionSetDelta, 
solutionSetDeltaUpdateAux, executionMode);
-               solutionSetDeltaUpdateAux.setIncomingConnection(conn);
-               solutionSetDelta.addOutgoingConnection(conn);
-               
-               this.solutionSetDelta = solutionSetDeltaUpdateAux;
-               this.nextWorkset = nextWorkset;
-               
-               this.singleRoot = new SingleRootJoiner();
-               this.solutionSetDeltaRootConnection = new 
DagConnection(solutionSetDeltaUpdateAux,
-                                                                               
                        this.singleRoot, executionMode);
-
-               this.nextWorksetRootConnection = new DagConnection(nextWorkset, 
this.singleRoot, executionMode);
-               this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, 
this.nextWorksetRootConnection);
-               
-               
solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection);
-               
nextWorkset.addOutgoingConnection(this.nextWorksetRootConnection);
-       }
-       
-       public int getCostWeight() {
-               return this.costWeight;
-       }
-       
-       public TwoInputNode getSingleRootOfStepFunction() {
-               return this.singleRoot;
-       }
-       
-       public FieldList getSolutionSetKeyFields() {
-               return this.solutionSetKeyFields;
-       }
-       
-       public OptimizerNode getInitialSolutionSetPredecessorNode() {
-               return getFirstPredecessorNode();
-       }
-       
-       public OptimizerNode getInitialWorksetPredecessorNode() {
-               return getSecondPredecessorNode();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public String getName() {
-               return "Workset Iteration";
-       }
-
-       @Override
-       public SemanticProperties getSemanticProperties() {
-               return new EmptySemanticProperties();
-       }
-
-       protected void readStubAnnotations() {}
-       
-       @Override
-       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
-               this.estimatedOutputSize = 
getFirstPredecessorNode().getEstimatedOutputSize();
-               this.estimatedNumRecords = 
getFirstPredecessorNode().getEstimatedNumRecords();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                             Properties and Optimization
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       protected List<OperatorDescriptorDual> getPossibleProperties() {
-               return this.dataProperties;
-       }
-       
-       @Override
-       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
-               // our own solution (the solution set) is always partitioned 
and this cannot be adjusted
-               // depending on what the successor to the workset iteration 
requests. for that reason,
-               // we ignore incoming interesting properties.
-               
-               // in addition, we need to make 2 interesting property passes, 
because the root of the step function 
-               // that computes the next workset needs the interesting 
properties as generated by the
-               // workset source of the step function. the second pass 
concerns only the workset path.
-               // as initial interesting properties, we have the trivial ones 
for the step function,
-               // and partitioned on the solution set key for the solution set 
delta 
-               
-               RequestedGlobalProperties partitionedProperties = new 
RequestedGlobalProperties();
-               
partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
-               InterestingProperties partitionedIP = new 
InterestingProperties();
-               partitionedIP.addGlobalProperties(partitionedProperties);
-               partitionedIP.addLocalProperties(new 
RequestedLocalProperties());
-               
-               this.nextWorksetRootConnection.setInterestingProperties(new 
InterestingProperties());
-               
this.solutionSetDeltaRootConnection.setInterestingProperties(partitionedIP.clone());
-               
-               InterestingPropertyVisitor ipv = new 
InterestingPropertyVisitor(estimator);
-               this.nextWorkset.accept(ipv);
-               this.solutionSetDelta.accept(ipv);
-               
-               // take the interesting properties of the partial solution and 
add them to the root interesting properties
-               InterestingProperties worksetIntProps = 
this.worksetNode.getInterestingProperties();
-               InterestingProperties intProps = new InterestingProperties();
-               
intProps.getGlobalProperties().addAll(worksetIntProps.getGlobalProperties());
-               
intProps.getLocalProperties().addAll(worksetIntProps.getLocalProperties());
-               
-               // clear all interesting properties to prepare the second 
traversal
-               this.nextWorksetRootConnection.clearInterestingProperties();
-               this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
-               
-               // 2nd pass
-               
this.nextWorksetRootConnection.setInterestingProperties(intProps);
-               this.nextWorkset.accept(ipv);
-               
-               // now add the interesting properties of the workset to the 
workset input
-               final InterestingProperties inProps = 
this.worksetNode.getInterestingProperties().clone();
-               inProps.addGlobalProperties(new RequestedGlobalProperties());
-               inProps.addLocalProperties(new RequestedLocalProperties());
-               this.input2.setInterestingProperties(inProps);
-               
-               // the partial solution must be hash partitioned, so it has 
only that as interesting properties
-               this.input1.setInterestingProperties(partitionedIP);
-       }
-       
-       @Override
-       public void clearInterestingProperties() {
-               super.clearInterestingProperties();
-               
-               this.nextWorksetRootConnection.clearInterestingProperties();
-               
this.solutionSetDeltaRootConnection.clearInterestingProperties();
-               
-               this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
-               
this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
-       }
-       
-       @Override
-       protected void instantiate(OperatorDescriptorDual operator, Channel 
solutionSetIn, Channel worksetIn,
-                       List<Set<? extends NamedChannel>> 
broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
-                       RequestedGlobalProperties globPropsReqSolutionSet, 
RequestedGlobalProperties globPropsReqWorkset,
-                       RequestedLocalProperties locPropsReqSolutionSet, 
RequestedLocalProperties locPropsReqWorkset)
-       {
-               // check for pipeline breaking using hash join with build on 
the solution set side
-               
placePipelineBreakersIfNecessary(DriverStrategy.HYBRIDHASH_BUILD_FIRST, 
solutionSetIn, worksetIn);
-               
-               // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
-               // Whenever we instantiate the iteration, we enumerate new 
candidates for the step function.
-               // That way, we make sure we have an appropriate plan for each 
candidate for the initial partial solution,
-               // we have a fitting candidate for the step function (often, 
work is pushed out of the step function).
-               // Among the candidates of the step function, we keep only 
those that meet the requested properties of the
-               // current candidate initial partial solution. That makes sure 
these properties exist at the beginning of
-               // every iteration.
-               
-               // 1) Because we enumerate multiple times, we may need to clean 
the cached plans
-               //    before starting another enumeration
-               this.nextWorkset.accept(PlanCacheCleaner.INSTANCE);
-               this.solutionSetDelta.accept(PlanCacheCleaner.INSTANCE);
-               
-               // 2) Give the partial solution the properties of the current 
candidate for the initial partial solution
-               //    This concerns currently only the workset.
-               
this.worksetNode.setCandidateProperties(worksetIn.getGlobalProperties(), 
worksetIn.getLocalProperties(), worksetIn);
-               
this.solutionSetNode.setCandidateProperties(this.partitionedProperties, new 
LocalProperties(), solutionSetIn);
-               
-               final SolutionSetPlanNode sspn = 
this.solutionSetNode.getCurrentSolutionSetPlanNode();
-               final WorksetPlanNode wspn = 
this.worksetNode.getCurrentWorksetPlanNode();
-               
-               // 3) Get the alternative plans
-               List<PlanNode> solutionSetDeltaCandidates = 
this.solutionSetDelta.getAlternativePlans(estimator);
-               List<PlanNode> worksetCandidates = 
this.nextWorkset.getAlternativePlans(estimator);
-               
-               // 4) Throw away all that are not compatible with the 
properties currently requested to the
-               //    initial partial solution
-               
-               // Make sure that the workset candidates fulfill the input 
requirements
-               {
-                       List<PlanNode> newCandidates = new 
ArrayList<PlanNode>();
-                       
-                       for (Iterator<PlanNode> planDeleter = 
worksetCandidates.iterator(); planDeleter.hasNext(); ) {
-                               PlanNode candidate = planDeleter.next();
-                               
-                               GlobalProperties atEndGlobal = 
candidate.getGlobalProperties();
-                               LocalProperties atEndLocal = 
candidate.getLocalProperties();
-                               
-                               FeedbackPropertiesMeetRequirementsReport report 
= candidate.checkPartialSolutionPropertiesMet(wspn,
-                                                                               
                                                                                
                        atEndGlobal, atEndLocal);
-
-                               if (report == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-                                       ; // depends only through broadcast 
variable on the workset solution
-                               }
-                               else if (report == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-                                       // attach a no-op node through which we 
create the properties of the original input
-                                       Channel toNoOp = new Channel(candidate);
-                                       
globPropsReqWorkset.parameterizeChannel(toNoOp, false,
-                                                                               
                                        
nextWorksetRootConnection.getDataExchangeMode(), false);
-                                       
locPropsReqWorkset.parameterizeChannel(toNoOp);
-                                       
-                                       UnaryOperatorNode 
rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset 
Properties",
-                                                                               
                                                                                
                        FieldList.EMPTY_LIST);
-                                       
-                                       
rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
-                                       
-                                       SingleInputPlanNode 
rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
-                                                                               
                rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
-                                                                               
                toNoOp, DriverStrategy.UNARY_NO_OP);
-                                       
rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(),
-                                                                               
                                                        
toNoOp.getLocalProperties());
-                                       
estimator.costOperator(rebuildWorksetPropertiesPlanNode);
-                                               
-                                       GlobalProperties atEndGlobalModified = 
rebuildWorksetPropertiesPlanNode.getGlobalProperties();
-                                       LocalProperties atEndLocalModified = 
rebuildWorksetPropertiesPlanNode.getLocalProperties();
-                                               
-                                       if 
(!(atEndGlobalModified.equals(atEndGlobal) && 
atEndLocalModified.equals(atEndLocal))) {
-                                               
FeedbackPropertiesMeetRequirementsReport report2 = 
candidate.checkPartialSolutionPropertiesMet(
-                                                                               
                                                                wspn, 
atEndGlobalModified, atEndLocalModified);
-                                               if (report2 != 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-                                                       
newCandidates.add(rebuildWorksetPropertiesPlanNode);
-                                               }
-                                       }
-                                       
-                                       // remove the original operator and add 
the modified candidate
-                                       planDeleter.remove();
-                                       
-                               }
-                       }
-                       
-                       worksetCandidates.addAll(newCandidates);
-               }
-               
-               if (worksetCandidates.isEmpty()) {
-                       return;
-               }
-               
-               // sanity check the solution set delta
-               for (PlanNode solutionSetDeltaCandidate : 
solutionSetDeltaCandidates) {
-                       SingleInputPlanNode candidate = (SingleInputPlanNode) 
solutionSetDeltaCandidate;
-                       GlobalProperties gp = candidate.getGlobalProperties();
-
-                       if (gp.getPartitioning() != 
PartitioningProperty.HASH_PARTITIONED || gp.getPartitioningFields() == null ||
-                                       
!gp.getPartitioningFields().equals(this.solutionSetKeyFields)) {
-                               throw new CompilerException("Bug: The solution 
set delta is not partitioned.");
-                       }
-               }
-               
-               // 5) Create a candidate for the Iteration Node for every 
remaining plan of the step function.
-               
-               final GlobalProperties gp = new GlobalProperties();
-               gp.setHashPartitioned(this.solutionSetKeyFields);
-               gp.addUniqueFieldCombination(this.solutionSetKeyFields);
-               
-               LocalProperties lp = 
LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
-               
-               // take all combinations of solution set delta and workset plans
-               for (PlanNode solutionSetCandidate : 
solutionSetDeltaCandidates) {
-                       for (PlanNode worksetCandidate : worksetCandidates) {
-                               // check whether they have the same operator at 
their latest branching point
-                               if 
(this.singleRoot.areBranchCompatible(solutionSetCandidate, worksetCandidate)) {
-                                       
-                                       SingleInputPlanNode 
siSolutionDeltaCandidate = (SingleInputPlanNode) solutionSetCandidate;
-                                       boolean immediateDeltaUpdate;
-                                       
-                                       // check whether we need a dedicated 
solution set delta operator, or whether we can update on the fly
-                                       if 
(siSolutionDeltaCandidate.getInput().getShipStrategy() == 
ShipStrategyType.FORWARD &&
-                                                       
this.solutionDeltaImmediatelyAfterSolutionJoin)
-                                       {
-                                               // we do not need this extra 
node. we can make the predecessor the delta
-                                               // sanity check the node and 
connection
-                                               if 
(siSolutionDeltaCandidate.getDriverStrategy() != DriverStrategy.UNARY_NO_OP ||
-                                                               
siSolutionDeltaCandidate.getInput().getLocalStrategy() != LocalStrategy.NONE)
-                                               {
-                                                       throw new 
CompilerException("Invalid Solution set delta node.");
-                                               }
-                                               
-                                               solutionSetCandidate = 
siSolutionDeltaCandidate.getInput().getSource();
-                                               immediateDeltaUpdate = true;
-                                       } else {
-                                               // was not partitioned, we need 
to keep this node.
-                                               // mark that we materialize the 
input
-                                               
siSolutionDeltaCandidate.getInput().setTempMode(TempMode.PIPELINE_BREAKER);
-                                               immediateDeltaUpdate = false;
-                                       }
-                                       
-                                       WorksetIterationPlanNode wsNode = new 
WorksetIterationPlanNode(this,
-                                                       "WorksetIteration 
("+this.getOperator().getName()+")", solutionSetIn,
-                                                       worksetIn, sspn, wspn, 
worksetCandidate, solutionSetCandidate);
-                                       
wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
-                                       wsNode.initProperties(gp, lp);
-                                       target.add(wsNode);
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void computeUnclosedBranchStack() {
-               if (this.openBranches != null) {
-                       return;
-               }
-               
-               // IMPORTANT: First compute closed branches from the two inputs
-               // we need to do this because the runtime iteration head 
effectively joins
-               
addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
-               
addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
-
-               List<UnclosedBranchDescriptor> result1 = 
getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
-               List<UnclosedBranchDescriptor> result2 = 
getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
-
-               ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new 
ArrayList<UnclosedBranchDescriptor>();
-               mergeLists(result1, result2, inputsMerged1, true); // this 
method also sets which branches are joined here (in the head)
-               
-               
addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
-
-               ArrayList<UnclosedBranchDescriptor> inputsMerged2 = new 
ArrayList<UnclosedBranchDescriptor>();
-               List<UnclosedBranchDescriptor> result3 = 
getSingleRootOfStepFunction().openBranches;
-               mergeLists(inputsMerged1, result3, inputsMerged2, true);
-
-               // handle the data flow branching for the broadcast inputs
-               List<UnclosedBranchDescriptor> result = 
computeUnclosedBranchStackForBroadcastInputs(inputsMerged2);
-
-               this.openBranches = (result == null || result.isEmpty()) ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                      Iteration Specific Traversals
-       // 
--------------------------------------------------------------------------------------------
-
-       public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
-               this.singleRoot.accept(visitor);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                             Utility Classes
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static final class WorksetOpDescriptor extends 
OperatorDescriptorDual {
-               
-               private WorksetOpDescriptor(FieldList solutionSetKeys) {
-                       super(solutionSetKeys, null);
-               }
-
-               @Override
-               public DriverStrategy getStrategy() {
-                       return DriverStrategy.NONE;
-               }
-
-               @Override
-               protected List<GlobalPropertiesPair> 
createPossibleGlobalProperties() {
-                       RequestedGlobalProperties partitionedGp = new 
RequestedGlobalProperties();
-                       partitionedGp.setHashPartitioned(this.keys1);
-                       return Collections.singletonList(new 
GlobalPropertiesPair(partitionedGp, new RequestedGlobalProperties()));
-               }
-
-               @Override
-               protected List<LocalPropertiesPair> 
createPossibleLocalProperties() {
-                       // all properties are possible
-                       return Collections.singletonList(new 
LocalPropertiesPair(
-                               new RequestedLocalProperties(), new 
RequestedLocalProperties()));
-               }
-               
-               @Override
-               public boolean areCompatible(RequestedGlobalProperties 
requested1, RequestedGlobalProperties requested2,
-                               GlobalProperties produced1, GlobalProperties 
produced2) {
-                       return true;
-               }
-               
-               @Override
-               public boolean areCoFulfilled(RequestedLocalProperties 
requested1, RequestedLocalProperties requested2,
-                               LocalProperties produced1, LocalProperties 
produced2) {
-                       return true;
-               }
-
-               @Override
-               public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public GlobalProperties 
computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public LocalProperties computeLocalProperties(LocalProperties 
in1, LocalProperties in2) {
-                       throw new UnsupportedOperationException();
-               }
-       }
-       
-       public static class SingleRootJoiner extends TwoInputNode {
-               
-               SingleRootJoiner() {
-                       super(new NoOpBinaryUdfOp<Nothing>(new 
NothingTypeInfo()));
-                       
-                       setDegreeOfParallelism(1);
-               }
-               
-               public void setInputs(DagConnection input1, DagConnection 
input2) {
-                       this.input1 = input1;
-                       this.input2 = input2;
-               }
-               
-               @Override
-               public String getName() {
-                       return "Internal Utility Node";
-               }
-
-               @Override
-               protected List<OperatorDescriptorDual> getPossibleProperties() {
-                       return Collections.emptyList();
-               }
-
-               @Override
-               protected void 
computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-                       // no estimates are needed here
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
deleted file mode 100644
index 3b05aba..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-
-/**
- * The optimizer's internal representation of the partial solution that is 
input to a bulk iteration.
- */
-public class WorksetNode extends AbstractPartialSolutionNode {
-       
-       private final WorksetIterationNode iterationNode;
-       
-       
-       public WorksetNode(WorksetPlaceHolder<?> psph, WorksetIterationNode 
iterationNode) {
-               super(psph);
-               this.iterationNode = iterationNode;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void setCandidateProperties(GlobalProperties gProps, 
LocalProperties lProps, Channel initialInput) {
-               if (this.cachedPlans != null) {
-                       throw new IllegalStateException();
-               } else {
-                       WorksetPlanNode wspn = new WorksetPlanNode(this, 
"Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput);
-                       this.cachedPlans = 
Collections.<PlanNode>singletonList(wspn);
-               }
-       }
-       
-       public WorksetPlanNode getCurrentWorksetPlanNode() {
-               if (this.cachedPlans != null) {
-                       return (WorksetPlanNode) this.cachedPlans.get(0);
-               } else {
-                       throw new IllegalStateException();
-               }
-       }
-       
-       public WorksetIterationNode getIterationNode() {
-               return this.iterationNode;
-       }
-       
-       @Override
-       public void computeOutputEstimates(DataStatistics statistics) {
-               
copyEstimates(this.iterationNode.getInitialWorksetPredecessorNode());
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Gets the contract object for this data source node.
-        * 
-        * @return The contract.
-        */
-       @Override
-       public WorksetPlaceHolder<?> getOperator() {
-               return (WorksetPlaceHolder<?>) super.getOperator();
-       }
-
-       @Override
-       public String getName() {
-               return "Workset";
-       }
-       
-       @Override
-       public void computeUnclosedBranchStack() {
-               if (this.openBranches != null) {
-                       return;
-               }
-
-               DagConnection worksetInput = 
this.iterationNode.getSecondIncomingConnection();
-               OptimizerNode worksetSource = worksetInput.getSource();
-               
-               addClosedBranches(worksetSource.closedBranchingNodes);
-               List<UnclosedBranchDescriptor> fromInput = 
worksetSource.getBranchesForParent(worksetInput);
-               this.openBranches = (fromInput == null || fromInput.isEmpty()) 
? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
deleted file mode 100644
index 57ba29d..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataproperties;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class represents global properties of the data at a certain point in 
the plan.
- * Global properties are properties that describe data across different 
partitions, such as
- * whether the data is hash partitioned, range partitioned, replicated, etc.
- */
-public class GlobalProperties implements Cloneable {
-
-       public static final Logger LOG = 
LoggerFactory.getLogger(GlobalProperties.class);
-       
-       private PartitioningProperty partitioning;      // the type partitioning
-       
-       private FieldList partitioningFields;           // the fields which are 
partitioned
-       
-       private Ordering ordering;                                      // 
order of the partitioned fields, if it is an ordered (range) range partitioning
-       
-       private Set<FieldSet> uniqueFieldCombinations;
-       
-       private Partitioner<?> customPartitioner;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Initializes the global properties with no partitioning.
-        */
-       public GlobalProperties() {
-               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Sets this global properties to represent a hash partitioning.
-        * 
-        * @param partitionedFields The key fields on which the data is hash 
partitioned.
-        */
-       public void setHashPartitioned(FieldList partitionedFields) {
-               if (partitionedFields == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.partitioning = PartitioningProperty.HASH_PARTITIONED;
-               this.partitioningFields = partitionedFields;
-               this.ordering = null;
-       }
-       
-
-       public void setRangePartitioned(Ordering ordering) {
-               if (ordering == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
-               this.ordering = ordering;
-               this.partitioningFields = ordering.getInvolvedIndexes();
-       }
-       
-       public void setAnyPartitioning(FieldList partitionedFields) {
-               if (partitionedFields == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.partitioning = PartitioningProperty.ANY_PARTITIONING;
-               this.partitioningFields = partitionedFields;
-               this.ordering = null;
-       }
-       
-       public void setRandomPartitioned() {
-               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
-               this.partitioningFields = null;
-               this.ordering = null;
-       }
-       
-       public void setFullyReplicated() {
-               this.partitioning = PartitioningProperty.FULL_REPLICATION;
-               this.partitioningFields = null;
-               this.ordering = null;
-       }
-       
-       public void setForcedRebalanced() {
-               this.partitioning = PartitioningProperty.FORCED_REBALANCED;
-               this.partitioningFields = null;
-               this.ordering = null;
-       }
-       
-       public void setCustomPartitioned(FieldList partitionedFields, 
Partitioner<?> partitioner) {
-               if (partitionedFields == null || partitioner == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
-               this.partitioningFields = partitionedFields;
-               this.ordering = null;
-               this.customPartitioner = partitioner;
-       }
-       
-       public void addUniqueFieldCombination(FieldSet fields) {
-               if (fields == null) {
-                       return;
-               }
-               if (this.uniqueFieldCombinations == null) {
-                       this.uniqueFieldCombinations = new HashSet<FieldSet>();
-               }
-               this.uniqueFieldCombinations.add(fields);
-       }
-       
-       public void clearUniqueFieldCombinations() {
-               if (this.uniqueFieldCombinations != null) {
-                       this.uniqueFieldCombinations = null;
-               }
-       }
-       
-       public Set<FieldSet> getUniqueFieldCombination() {
-               return this.uniqueFieldCombinations;
-       }
-       
-       public FieldList getPartitioningFields() {
-               return this.partitioningFields;
-       }
-       
-       public Ordering getPartitioningOrdering() {
-               return this.ordering;
-       }
-       
-       public PartitioningProperty getPartitioning() {
-               return this.partitioning;
-       }
-       
-       public Partitioner<?> getCustomPartitioner() {
-               return this.customPartitioner;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public boolean isPartitionedOnFields(FieldSet fields) {
-               if (this.partitioning.isPartitionedOnKey() && 
fields.isValidSubset(this.partitioningFields)) {
-                       return true;
-               } else if (this.uniqueFieldCombinations != null) {
-                       for (FieldSet set : this.uniqueFieldCombinations) {
-                               if (fields.isValidSubset(set)) {
-                                       return true;
-                               }
-                       }
-                       return false;
-               } else {
-                       return false;
-               }
-       }
-
-       public boolean isExactlyPartitionedOnFields(FieldList fields) {
-               return this.partitioning.isPartitionedOnKey() && 
fields.isExactMatch(this.partitioningFields);
-       }
-       
-       public boolean matchesOrderedPartitioning(Ordering o) {
-               if (this.partitioning == 
PartitioningProperty.RANGE_PARTITIONED) {
-                       if (this.ordering.getNumberOfFields() > 
o.getNumberOfFields()) {
-                               return false;
-                       }
-                       
-                       for (int i = 0; i < this.ordering.getNumberOfFields(); 
i++) {
-                               if (this.ordering.getFieldNumber(i) != 
o.getFieldNumber(i)) {
-                                       return false;
-                               }
-                               
-                               // if this one request no order, everything is 
good
-                               final Order oo = o.getOrder(i);
-                               final Order to = this.ordering.getOrder(i);
-                               if (oo != Order.NONE) {
-                                       if (oo == Order.ANY) {
-                                               // if any order is requested, 
any not NONE order is good
-                                               if (to == Order.NONE) {
-                                                       return false;
-                                               }
-                                       } else if (oo != to) {
-                                               // the orders must be equal
-                                               return false;
-                                       }
-                               }
-                       }
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       public boolean isFullyReplicated() {
-               return this.partitioning == 
PartitioningProperty.FULL_REPLICATION;
-       }
-
-       /**
-        * Checks, if the properties in this object are trivial, i.e. only 
standard values.
-        */
-       public boolean isTrivial() {
-               return partitioning == PartitioningProperty.RANDOM_PARTITIONED;
-       }
-
-       /**
-        * This method resets the properties to a state where no properties are 
given.
-        */
-       public void reset() {
-               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
-               this.ordering = null;
-               this.partitioningFields = null;
-       }
-
-       /**
-        * Filters these GlobalProperties by the fields that are forwarded to 
the output
-        * as described by the SemanticProperties.
-        *
-        * @param props The semantic properties holding information about 
forwarded fields.
-        * @param input The index of the input.
-        * @return The filtered GlobalProperties
-        */
-       public GlobalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
-
-               if (props == null) {
-                       throw new NullPointerException("SemanticProperties may 
not be null.");
-               }
-
-               GlobalProperties gp = new GlobalProperties();
-
-               // filter partitioning
-               switch(this.partitioning) {
-                       case RANGE_PARTITIONED:
-                               // check if ordering is preserved
-                               Ordering newOrdering = new Ordering();
-                               for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
-                                       int sourceField = 
this.ordering.getInvolvedIndexes().get(i);
-                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
-
-                                       if (targetField == null || 
targetField.size() == 0) {
-                                               // partitioning is destroyed
-                                               newOrdering = null;
-                                               break;
-                                       } else {
-                                               // use any field of target 
fields for now. We should use something like field equivalence sets in the 
future.
-                                               if(targetField.size() > 1) {
-                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
-                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
-                                               }
-                                               
newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), 
this.ordering.getOrder(i));
-                                       }
-                               }
-                               if(newOrdering != null) {
-                                       gp.partitioning = 
PartitioningProperty.RANGE_PARTITIONED;
-                                       gp.ordering = newOrdering;
-                                       gp.partitioningFields = 
newOrdering.getInvolvedIndexes();
-                               }
-                               break;
-                       case HASH_PARTITIONED:
-                       case ANY_PARTITIONING:
-                       case CUSTOM_PARTITIONING:
-                               FieldList newPartitioningFields = new 
FieldList();
-                               for (int sourceField : this.partitioningFields) 
{
-                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
-
-                                       if (targetField == null || 
targetField.size() == 0) {
-                                               newPartitioningFields = null;
-                                               break;
-                                       } else {
-                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
-                                               if(targetField.size() > 1) {
-                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
-                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
-                                               }
-                                               newPartitioningFields = 
newPartitioningFields.addField(targetField.toArray()[0]);
-                                       }
-                               }
-                               if(newPartitioningFields != null) {
-                                       gp.partitioning = this.partitioning;
-                                       gp.partitioningFields = 
newPartitioningFields;
-                                       gp.customPartitioner = 
this.customPartitioner;
-                               }
-                               break;
-                       case FORCED_REBALANCED:
-                       case FULL_REPLICATION:
-                       case RANDOM_PARTITIONED:
-                               gp.partitioning = this.partitioning;
-                               break;
-                       default:
-                               throw new RuntimeException("Unknown 
partitioning type.");
-               }
-
-               // filter unique field combinations
-               if (this.uniqueFieldCombinations != null) {
-                       Set<FieldSet> newUniqueFieldCombinations = new 
HashSet<FieldSet>();
-                       for (FieldSet fieldCombo : 
this.uniqueFieldCombinations) {
-                               FieldSet newFieldCombo = new FieldSet();
-                               for (Integer sourceField : fieldCombo) {
-                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
-
-                                       if (targetField == null || 
targetField.size() == 0) {
-                                               newFieldCombo = null;
-                                               break;
-                                       } else {
-                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
-                                               if(targetField.size() > 1) {
-                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
-                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
-                                               }
-                                               newFieldCombo = 
newFieldCombo.addField(targetField.toArray()[0]);
-                                       }
-                               }
-                               if (newFieldCombo != null) {
-                                       
newUniqueFieldCombinations.add(newFieldCombo);
-                               }
-                       }
-                       if(!newUniqueFieldCombinations.isEmpty()) {
-                               gp.uniqueFieldCombinations = 
newUniqueFieldCombinations;
-                       }
-               }
-
-               return gp;
-       }
-
-
-       public void parameterizeChannel(Channel channel, boolean 
globalDopChange,
-                                                                       
ExecutionMode exchangeMode, boolean breakPipeline) {
-
-               ShipStrategyType shipType;
-               FieldList partitionKeys;
-               boolean[] sortDirection;
-               Partitioner<?> partitioner;
-
-               switch (this.partitioning) {
-                       case RANDOM_PARTITIONED:
-                               shipType = globalDopChange ? 
ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD;
-                               partitionKeys = null;
-                               sortDirection = null;
-                               partitioner = null;
-                               break;
-
-                       case FULL_REPLICATION:
-                               shipType = ShipStrategyType.BROADCAST;
-                               partitionKeys = null;
-                               sortDirection = null;
-                               partitioner = null;
-                               break;
-
-                       case ANY_PARTITIONING:
-                       case HASH_PARTITIONED:
-                               shipType = ShipStrategyType.PARTITION_HASH;
-                               partitionKeys = 
Utils.createOrderedFromSet(this.partitioningFields);
-                               sortDirection = null;
-                               partitioner = null;
-                               break;
-
-                       case RANGE_PARTITIONED:
-                               shipType = ShipStrategyType.PARTITION_RANGE;
-                               partitionKeys = 
this.ordering.getInvolvedIndexes();
-                               sortDirection = 
this.ordering.getFieldSortDirections();
-                               partitioner = null;
-                               break;
-
-                       case FORCED_REBALANCED:
-                               shipType = ShipStrategyType.PARTITION_RANDOM;
-                               partitionKeys = null;
-                               sortDirection = null;
-                               partitioner = null;
-                               break;
-
-                       case CUSTOM_PARTITIONING:
-                               shipType = ShipStrategyType.PARTITION_CUSTOM;
-                               partitionKeys = this.partitioningFields;
-                               sortDirection = null;
-                               partitioner = this.customPartitioner;
-                               break;
-
-                       default:
-                               throw new CompilerException("Unsupported 
partitioning strategy");
-               }
-
-               DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, 
shipType, breakPipeline);
-               channel.setShipStrategy(shipType, partitionKeys, sortDirection, 
partitioner, exMode);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public int hashCode() {
-               final int prime = 31;
-               int result = 1;
-               result = prime * result + ((partitioning == null) ? 0 : 
partitioning.ordinal());
-               result = prime * result + ((partitioningFields == null) ? 0 : 
partitioningFields.hashCode());
-               result = prime * result + ((ordering == null) ? 0 : 
ordering.hashCode());
-               return result;
-       }
-
-       @Override
-       public boolean equals(Object obj) {
-               if (obj != null && obj instanceof GlobalProperties) {
-                       final GlobalProperties other = (GlobalProperties) obj;
-                       return (this.partitioning == other.partitioning)
-                               && (this.ordering == other.ordering || 
(this.ordering != null && this.ordering.equals(other.ordering)))
-                               && (this.partitioningFields == 
other.partitioningFields || 
-                                                       
(this.partitioningFields != null && 
this.partitioningFields.equals(other.partitioningFields)))
-                               && (this.uniqueFieldCombinations == 
other.uniqueFieldCombinations || 
-                                                       
(this.uniqueFieldCombinations != null && 
this.uniqueFieldCombinations.equals(other.uniqueFieldCombinations)));
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public String toString() {
-               final StringBuilder bld = new StringBuilder(
-                       "GlobalProperties [partitioning=" + partitioning + 
-                       (this.partitioningFields == null ? "" : ", on fields " 
+ this.partitioningFields) + 
-                       (this.ordering == null ? "" : ", with ordering " + 
this.ordering));
-               
-               if (this.uniqueFieldCombinations == null) {
-                       bld.append(']');
-               } else {
-                       bld.append(" - Unique field groups: ");
-                       bld.append(this.uniqueFieldCombinations);
-                       bld.append(']');
-               }
-               return bld.toString();
-       }
-
-       @Override
-       public GlobalProperties clone() {
-               final GlobalProperties newProps = new GlobalProperties();
-               newProps.partitioning = this.partitioning;
-               newProps.partitioningFields = this.partitioningFields;
-               newProps.ordering = this.ordering;
-               newProps.customPartitioner = this.customPartitioner;
-               newProps.uniqueFieldCombinations = this.uniqueFieldCombinations 
== null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
-               return newProps;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static GlobalProperties combine(GlobalProperties gp1, 
GlobalProperties gp2) {
-               if (gp1.isFullyReplicated()) {
-                       if (gp2.isFullyReplicated()) {
-                               return new GlobalProperties();
-                       } else {
-                               return gp2;
-                       }
-               } else if (gp2.isFullyReplicated()) {
-                       return gp1;
-               } else if (gp1.ordering != null) {
-                       return gp1;
-               } else if (gp2.ordering != null) {
-                       return gp2;
-               } else if (gp1.partitioningFields != null) {
-                       return gp1;
-               } else if (gp2.partitioningFields != null) {
-                       return gp2;
-               } else if (gp1.uniqueFieldCombinations != null) {
-                       return gp1;
-               } else if (gp2.uniqueFieldCombinations != null) {
-                       return gp2;
-               } else if (gp1.getPartitioning().isPartitioned()) {
-                       return gp1;
-               } else if (gp2.getPartitioning().isPartitioned()) {
-                       return gp2;
-               } else {
-                       return gp1;
-               }
-       }
-}

Reply via email to