http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
new file mode 100644
index 0000000..df05b64
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class BulkPartialSolutionPlanNode extends PlanNode {
+       
+       private static final Costs NO_COSTS = new Costs();
+       
+       private BulkIterationPlanNode containingIterationNode;
+       
+       private Channel initialInput;
+       
+       public Object postPassHelper;
+       
+       
+       public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, 
String nodeName,
+                       GlobalProperties gProps, LocalProperties lProps,
+                       Channel initialInput)
+       {
+               super(template, nodeName, DriverStrategy.NONE);
+               
+               this.globalProps = gProps;
+               this.localProps = lProps;
+               this.initialInput = initialInput;
+               
+               // the partial solution does not cost anything
+               this.nodeCosts = NO_COSTS;
+               this.cumulativeCosts = NO_COSTS;
+               
+               if (initialInput.getSource().branchPlan != null && 
initialInput.getSource().branchPlan.size() > 0) {
+                       if (this.branchPlan == null) {
+                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
+                       }
+                       
+                       
this.branchPlan.putAll(initialInput.getSource().branchPlan);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public BulkPartialSolutionNode getPartialSolutionNode() {
+               return (BulkPartialSolutionNode) this.template;
+       }
+       
+       public BulkIterationPlanNode getContainingIterationNode() {
+               return this.containingIterationNode;
+       }
+       
+       public void setContainingIterationNode(BulkIterationPlanNode 
containingIterationNode) {
+               this.containingIterationNode = containingIterationNode;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       visitor.postVisit(this);
+               }
+       }
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               return Collections.<PlanNode>emptyList();
+       }
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Collections.<Channel>emptyList();
+       }
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               }
+               SourceAndDamReport res = 
this.initialInput.getSource().hasDamOnPathDownTo(source);
+               if (res == FOUND_SOURCE_AND_DAM) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               else if (res == FOUND_SOURCE) {
+                       return (this.initialInput.getLocalStrategy().dams() || 
+                                       
this.initialInput.getTempMode().breaksPipeline() ||
+                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) ?
+                               FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+               }
+               else {
+                       return NOT_FOUND;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
new file mode 100644
index 0000000..875d1c3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.EstimateProvider;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+/**
+ * A Channel represents the result produced by an operator and the data 
exchange
+ * before the consumption by the target operator.
+ *
+ * The channel defines and tracks various properties and characteristics of the
+ * data set and data exchange.
+ *
+ * Data set characteristics:
+ * <ul>
+ *     <li>The "global properties" of the data, i.e., how the data is 
distributed across
+ *         partitions</li>
+ *     <li>The "required global properties" of the data, i.e., the global 
properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ *     <li>The "local properties" of the data, i.e., how the data is organized 
within a partition</li>
+ *     <li>The "required local properties" of the data, i.e., the local 
properties that, if absent,
+ *         would cause the program to return a wrong result.</li>
+ * </ul>
+ *
+ * Data exchange parameters:
+ * <ul>
+ *     <li>The "ship strategy", i.e., whether to forward the data, shuffle it, 
broadcast it, ...</li>
+ *     <li>The "ship keys", which are the positions of the key fields in the 
exchanged records.</li>
+ *     <li>The "data exchange mode", which defines whether to pipeline or 
batch the exchange</li>
+ *     <li>Several more...</li>
+ * </ul>
+ */
+public class Channel implements EstimateProvider, Cloneable, 
DumpableConnection<PlanNode> {
+       
+       private PlanNode source;
+       
+       private PlanNode target;
+
+       private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
+
+       private DataExchangeMode dataExchangeMode;
+       
+       private LocalStrategy localStrategy = LocalStrategy.NONE;
+       
+       private FieldList shipKeys;
+       
+       private FieldList localKeys;
+       
+       private boolean[] shipSortOrder;
+       
+       private boolean[] localSortOrder;
+       
+       private RequestedGlobalProperties requiredGlobalProps;
+       
+       private RequestedLocalProperties requiredLocalProps;
+       
+       private GlobalProperties globalProps;
+       
+       private LocalProperties localProps;
+       
+       private TypeSerializerFactory<?> serializer;
+       
+       private TypeComparatorFactory<?> shipStrategyComparator;
+       
+       private TypeComparatorFactory<?> localStrategyComparator;
+       
+       private DataDistribution dataDistribution;
+       
+       private Partitioner<?> partitioner;
+       
+       private TempMode tempMode;
+       
+       private double relativeTempMemory;
+       
+       private double relativeMemoryLocalStrategy;
+       
+       private int replicationFactor = 1;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public Channel(PlanNode sourceNode) {
+               this(sourceNode, null);
+       }
+       
+       public Channel(PlanNode sourceNode, TempMode tempMode) {
+               this.source = sourceNode;
+               this.tempMode = (tempMode == null ? TempMode.NONE : tempMode);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                         Accessors
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the source of this Channel.
+        *
+        * @return The source.
+        */
+       @Override
+       public PlanNode getSource() {
+               return this.source;
+       }
+       
+       /**
+        * Sets the target of this Channel.
+        *
+        * @param target The target.
+        */
+       public void setTarget(PlanNode target) {
+               this.target = target;
+       }
+       
+       /**
+        * Gets the target of this Channel.
+        *
+        * @return The target.
+        */
+       public PlanNode getTarget() {
+               return this.target;
+       }
+
+       public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode 
dataExchangeMode) {
+               setShipStrategy(strategy, null, null, null, dataExchangeMode);
+       }
+       
+       public void setShipStrategy(ShipStrategyType strategy, FieldList keys, 
DataExchangeMode dataExchangeMode) {
+               setShipStrategy(strategy, keys, null, null, dataExchangeMode);
+       }
+       
+       public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+                                                               boolean[] 
sortDirection, DataExchangeMode dataExchangeMode) {
+               setShipStrategy(strategy, keys, sortDirection, null, 
dataExchangeMode);
+       }
+       
+       public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+                                                               Partitioner<?> 
partitioner, DataExchangeMode dataExchangeMode) {
+               setShipStrategy(strategy, keys, null, partitioner, 
dataExchangeMode);
+       }
+       
+       public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+                                                               boolean[] 
sortDirection, Partitioner<?> partitioner,
+                                                               
DataExchangeMode dataExchangeMode) {
+               this.shipStrategy = strategy;
+               this.shipKeys = keys;
+               this.shipSortOrder = sortDirection;
+               this.partitioner = partitioner;
+               this.dataExchangeMode = dataExchangeMode;
+               this.globalProps = null;                // reset the global 
properties
+       }
+
+       /**
+        * Gets the data exchange mode (batch / streaming) to use for the data
+        * exchange of this channel.
+        *
+        * @return The data exchange mode of this channel.
+        */
+       public DataExchangeMode getDataExchangeMode() {
+               return dataExchangeMode;
+       }
+
+       public ShipStrategyType getShipStrategy() {
+               return this.shipStrategy;
+       }
+       
+       public FieldList getShipStrategyKeys() {
+               return this.shipKeys;
+       }
+       
+       public boolean[] getShipStrategySortOrder() {
+               return this.shipSortOrder;
+       }
+       
+       public void setLocalStrategy(LocalStrategy strategy) {
+               setLocalStrategy(strategy, null, null);
+       }
+       
+       public void setLocalStrategy(LocalStrategy strategy, FieldList keys, 
boolean[] sortDirection) {
+               this.localStrategy = strategy;
+               this.localKeys = keys;
+               this.localSortOrder = sortDirection;
+               this.localProps = null;         // reset the local properties
+       }
+       
+       public LocalStrategy getLocalStrategy() {
+               return this.localStrategy;
+       }
+       
+       public FieldList getLocalStrategyKeys() {
+               return this.localKeys;
+       }
+       
+       public boolean[] getLocalStrategySortOrder() {
+               return this.localSortOrder;
+       }
+       
+       public void setDataDistribution(DataDistribution dataDistribution) {
+               this.dataDistribution = dataDistribution;
+       }
+       
+       public DataDistribution getDataDistribution() {
+               return this.dataDistribution;
+       }
+       
+       public Partitioner<?> getPartitioner() {
+               return partitioner;
+       }
+       
+       public TempMode getTempMode() {
+               return this.tempMode;
+       }
+
+       /**
+        * Sets the temp mode of the connection.
+        * 
+        * @param tempMode
+        *        The temp mode of the connection.
+        */
+       public void setTempMode(TempMode tempMode) {
+               this.tempMode = tempMode;
+       }
+       
+       /**
+        * Gets the memory for materializing the channel's result from this 
Channel.
+        *
+        * @return The temp memory.
+        */
+       public double getRelativeTempMemory() {
+               return this.relativeTempMemory;
+       }
+       
+       /**
+        * Sets the memory for materializing the channel's result from this 
Channel.
+        *
+        * @param relativeTempMemory The memory for materialization.
+        */
+       public void setRelativeTempMemory(double relativeTempMemory) {
+               this.relativeTempMemory = relativeTempMemory;
+       }
+       
+       /**
+        * Sets the replication factor of the connection.
+        * 
+        * @param factor The replication factor of the connection.
+        */
+       public void setReplicationFactor(int factor) {
+               this.replicationFactor = factor;
+       }
+       
+       /**
+        * Returns the replication factor of the connection.
+        * 
+        * @return The replication factor of the connection.
+        */
+       public int getReplicationFactor() {
+               return this.replicationFactor;
+       }
+       
+       /**
+        * Gets the serializer from this Channel.
+        *
+        * @return The serializer.
+        */
+       public TypeSerializerFactory<?> getSerializer() {
+               return serializer;
+       }
+       
+       /**
+        * Sets the serializer for this Channel.
+        *
+        * @param serializer The serializer to set.
+        */
+       public void setSerializer(TypeSerializerFactory<?> serializer) {
+               this.serializer = serializer;
+       }
+       
+       /**
+        * Gets the ship strategy comparator from this Channel.
+        *
+        * @return The ship strategy comparator.
+        */
+       public TypeComparatorFactory<?> getShipStrategyComparator() {
+               return shipStrategyComparator;
+       }
+       
+       /**
+        * Sets the ship strategy comparator for this Channel.
+        *
+        * @param shipStrategyComparator The ship strategy comparator to set.
+        */
+       public void setShipStrategyComparator(TypeComparatorFactory<?> 
shipStrategyComparator) {
+               this.shipStrategyComparator = shipStrategyComparator;
+       }
+       
+       /**
+        * Gets the local strategy comparator from this Channel.
+        *
+        * @return The local strategy comparator.
+        */
+       public TypeComparatorFactory<?> getLocalStrategyComparator() {
+               return localStrategyComparator;
+       }
+       
+       /**
+        * Sets the local strategy comparator for this Channel.
+        *
+        * @param localStrategyComparator The local strategy comparator to set.
+        */
+       public void setLocalStrategyComparator(TypeComparatorFactory<?> 
localStrategyComparator) {
+               this.localStrategyComparator = localStrategyComparator;
+       }
+       
+       public double getRelativeMemoryLocalStrategy() {
+               return relativeMemoryLocalStrategy;
+       }
+       
+       public void setRelativeMemoryLocalStrategy(double 
relativeMemoryLocalStrategy) {
+               this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
+       }
+       
+       public boolean isOnDynamicPath() {
+               return this.source.isOnDynamicPath();
+       }
+       
+       public int getCostWeight() {
+               return this.source.getCostWeight();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                                Statistic Estimates
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public long getEstimatedOutputSize() {
+               long estimate = this.source.template.getEstimatedOutputSize();
+               return estimate < 0 ? estimate : estimate * 
this.replicationFactor;
+       }
+
+       @Override
+       public long getEstimatedNumRecords() {
+               long estimate =  this.source.template.getEstimatedNumRecords();
+               return estimate < 0 ? estimate : estimate * 
this.replicationFactor;
+       }
+       
+       @Override
+       public float getEstimatedAvgWidthPerOutputRecord() {
+               return 
this.source.template.getEstimatedAvgWidthPerOutputRecord();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                Data Property Handling
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       public RequestedGlobalProperties getRequiredGlobalProps() {
+               return requiredGlobalProps;
+       }
+
+       public void setRequiredGlobalProps(RequestedGlobalProperties 
requiredGlobalProps) {
+               this.requiredGlobalProps = requiredGlobalProps;
+       }
+
+       public RequestedLocalProperties getRequiredLocalProps() {
+               return requiredLocalProps;
+       }
+
+       public void setRequiredLocalProps(RequestedLocalProperties 
requiredLocalProps) {
+               this.requiredLocalProps = requiredLocalProps;
+       }
+
+       public GlobalProperties getGlobalProperties() {
+               if (this.globalProps == null) {
+                       this.globalProps = 
this.source.getGlobalProperties().clone();
+                       switch (this.shipStrategy) {
+                               case BROADCAST:
+                                       
this.globalProps.clearUniqueFieldCombinations();
+                                       this.globalProps.setFullyReplicated();
+                                       break;
+                               case PARTITION_HASH:
+                                       
this.globalProps.setHashPartitioned(this.shipKeys);
+                                       break;
+                               case PARTITION_RANGE:
+                                       
this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, 
this.shipSortOrder));
+                                       break;
+                               case FORWARD:
+                                       break;
+                               case PARTITION_RANDOM:
+                                       this.globalProps.reset();
+                                       break;
+                               case PARTITION_FORCED_REBALANCE:
+                                       this.globalProps.setForcedRebalanced();
+                                       break;
+                               case PARTITION_CUSTOM:
+                                       
this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
+                                       break;
+                               case NONE:
+                                       throw new CompilerException("Cannot 
produce GlobalProperties before ship strategy is set.");
+                       }
+               }
+               
+               return this.globalProps;
+       }
+       
+       public LocalProperties getLocalProperties() {
+               if (this.localProps == null) {
+                       computeLocalPropertiesAfterShippingOnly();
+                       switch (this.localStrategy) {
+                               case NONE:
+                                       break;
+                               case SORT:
+                               case COMBININGSORT:
+                                       this.localProps = 
LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, 
this.localSortOrder));
+                                       break;
+                               default:
+                                       throw new 
CompilerException("Unsupported local strategy for channel.");
+                       }
+               }
+               
+               return this.localProps;
+       }
+       
+       private void computeLocalPropertiesAfterShippingOnly() {
+               switch (this.shipStrategy) {
+                       case BROADCAST:
+                       case PARTITION_HASH:
+                       case PARTITION_CUSTOM:
+                       case PARTITION_RANGE:
+                       case PARTITION_RANDOM:
+                       case PARTITION_FORCED_REBALANCE:
+                               this.localProps = new LocalProperties();
+                               break;
+                       case FORWARD:
+                               this.localProps = 
this.source.getLocalProperties();
+                               break;
+                       case NONE:
+                               throw new CompilerException("ShipStrategy has 
not yet been set.");
+                       default:
+                               throw new CompilerException("Unknown 
ShipStrategy.");
+               }
+       }
+       
+       public void adjustGlobalPropertiesForFullParallelismChange() {
+               if (this.shipStrategy == null || this.shipStrategy == 
ShipStrategyType.NONE) {
+                       throw new IllegalStateException("Cannot adjust channel 
for degree of parallelism " +
+                                       "change before the ship strategy is 
set.");
+               }
+               
+               // make sure the properties are acquired
+               if (this.globalProps == null) {
+                       getGlobalProperties();
+               }
+               
+               // some strategies globally reestablish properties
+               switch (this.shipStrategy) {
+               case FORWARD:
+                       throw new CompilerException("Cannot use FORWARD 
strategy between operations " +
+                                       "with different number of parallel 
instances.");
+               case NONE: // excluded by sanity check. left here for 
verification check completion
+               case BROADCAST:
+               case PARTITION_HASH:
+               case PARTITION_RANGE:
+               case PARTITION_RANDOM:
+               case PARTITION_FORCED_REBALANCE:
+               case PARTITION_CUSTOM:
+                       return;
+               }
+               throw new CompilerException("Unrecognized Ship Strategy Type: " 
+ this.shipStrategy);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Utility method used while swapping binary union nodes for n-ary 
union nodes.
+        */
+       public void swapUnionNodes(PlanNode newUnionNode) {
+               if (!(this.source instanceof BinaryUnionPlanNode)) {
+                       throw new IllegalStateException();
+               } else {
+                       this.source = newUnionNode;
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public int getMaxDepth() {
+               return this.source.getOptimizerNode().getMaxDepth() + 1;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "Channel (" + this.source + (this.target == null ? ')' : 
") -> (" + this.target + ')') +
+                               '[' + this.shipStrategy + "] [" + 
this.localStrategy + "] " +
+                               (this.tempMode == null || this.tempMode == 
TempMode.NONE ? "{NO-TEMP}" : this.tempMode);
+       }
+
+       @Override
+       public Channel clone() {
+               try {
+                       return (Channel) super.clone();
+               } catch (CloneNotSupportedException cnsex) {
+                       throw new RuntimeException(cnsex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
new file mode 100644
index 0000000..01c56dd
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public class DualInputPlanNode extends PlanNode {
+       
+       protected final Channel input1;
+       protected final Channel input2;
+       
+       protected final FieldList keys1;
+       protected final FieldList keys2;
+       
+       protected final boolean[] sortOrders;
+       
+       private TypeComparatorFactory<?> comparator1;
+       private TypeComparatorFactory<?> comparator2;
+       private TypePairComparatorFactory<?, ?> pairComparator;
+       
+       public Object postPassHelper1;
+       public Object postPassHelper2;
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       public DualInputPlanNode(OptimizerNode template, String nodeName, 
Channel input1, Channel input2, DriverStrategy diverStrategy) {
+               this(template, nodeName, input1, input2, diverStrategy, null, 
null, null);
+       }
+       
+       public DualInputPlanNode(OptimizerNode template, String nodeName, 
Channel input1, Channel input2,
+                       DriverStrategy diverStrategy, FieldList 
driverKeyFields1, FieldList driverKeyFields2)
+       {
+               this(template, nodeName, input1, input2, diverStrategy, 
driverKeyFields1, driverKeyFields2,
+                                                                       
SingleInputPlanNode.getTrueArray(driverKeyFields1.size()));
+       }
+       
+       public DualInputPlanNode(OptimizerNode template, String nodeName, 
Channel input1, Channel input2, DriverStrategy diverStrategy,
+                       FieldList driverKeyFields1, FieldList driverKeyFields2, 
boolean[] driverSortOrders)
+       {
+               super(template, nodeName, diverStrategy);
+               this.input1 = input1;
+               this.input2 = input2;
+               this.keys1 = driverKeyFields1;
+               this.keys2 = driverKeyFields2;
+               this.sortOrders = driverSortOrders;
+               
+               if (this.input1.getShipStrategy() == 
ShipStrategyType.BROADCAST) {
+                       this.input1.setReplicationFactor(getParallelism());
+               }
+               if (this.input2.getShipStrategy() == 
ShipStrategyType.BROADCAST) {
+                       this.input2.setReplicationFactor(getParallelism());
+               }
+               
+               mergeBranchPlanMaps(input1.getSource(), input2.getSource());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public TwoInputNode getTwoInputNode() {
+               if (this.template instanceof TwoInputNode) {
+                       return (TwoInputNode) this.template;
+               } else {
+                       throw new RuntimeException();
+               }
+       }
+       
+       public FieldList getKeysForInput1() {
+               return this.keys1;
+       }
+       
+       public FieldList getKeysForInput2() {
+               return this.keys2;
+       }
+       
+       public boolean[] getSortOrders() {
+               return this.sortOrders;
+       }
+       
+       public TypeComparatorFactory<?> getComparator1() {
+               return this.comparator1;
+       }
+       
+       public TypeComparatorFactory<?> getComparator2() {
+               return this.comparator2;
+       }
+       
+       public void setComparator1(TypeComparatorFactory<?> comparator) {
+               this.comparator1 = comparator;
+       }
+       
+       public void setComparator2(TypeComparatorFactory<?> comparator) {
+               this.comparator2 = comparator;
+       }
+       
+       public TypePairComparatorFactory<?, ?> getPairComparator() {
+               return this.pairComparator;
+       }
+       
+       public void setPairComparator(TypePairComparatorFactory<?, ?> 
comparator) {
+               this.pairComparator = comparator;
+       }
+       
+       /**
+        * Gets the first input channel to this node.
+        * 
+        * @return The first input channel to this node.
+        */
+       public Channel getInput1() {
+               return this.input1;
+       }
+       
+       /**
+        * Gets the second input channel to this node.
+        * 
+        * @return The second input channel to this node.
+        */
+       public Channel getInput2() {
+               return this.input2;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       this.input1.getSource().accept(visitor);
+                       this.input2.getSource().accept(visitor);
+                       
+                       for (Channel broadcastInput : getBroadcastInputs()) {
+                               broadcastInput.getSource().accept(visitor);
+                       }
+                       
+                       visitor.postVisit(this);
+               }
+       }
+       
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               if (getBroadcastInputs() == null || 
getBroadcastInputs().isEmpty()) {
+                       return Arrays.asList(this.input1.getSource(), 
this.input2.getSource());
+               } else {
+                       List<PlanNode> preds = new ArrayList<PlanNode>();
+                       
+                       preds.add(input1.getSource());
+                       preds.add(input2.getSource());
+
+                       for (Channel c : getBroadcastInputs()) {
+                               preds.add(c.getSource());
+                       }
+                       
+                       return preds;
+               }
+       }
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Arrays.asList(this.input1, this.input2);
+       }
+
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               }
+               
+               // check first input
+               SourceAndDamReport res1 = 
this.input1.getSource().hasDamOnPathDownTo(source);
+               if (res1 == FOUND_SOURCE_AND_DAM) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               else if (res1 == FOUND_SOURCE) {
+                       if (this.input1.getLocalStrategy().dams() || 
this.input1.getTempMode().breaksPipeline() ||
+                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) {
+                               return FOUND_SOURCE_AND_DAM;
+                       } else {
+                               return FOUND_SOURCE;
+                       }
+               }
+               else {
+                       SourceAndDamReport res2 = 
this.input2.getSource().hasDamOnPathDownTo(source);
+                       if (res2 == FOUND_SOURCE_AND_DAM) {
+                               return FOUND_SOURCE_AND_DAM;
+                       }
+                       else if (res2 == FOUND_SOURCE) {
+                               if (this.input2.getLocalStrategy().dams() || 
this.input2.getTempMode().breaksPipeline() ||
+                                               getDriverStrategy().secondDam() 
== DamBehavior.FULL_DAM) {
+                                       return FOUND_SOURCE_AND_DAM;
+                               } else {
+                                       return FOUND_SOURCE;
+                               }
+                       }
+                       else {
+                               // NOT_FOUND
+                               // check the broadcast inputs
+                               
+                               for (NamedChannel nc : getBroadcastInputs()) {
+                                       SourceAndDamReport bcRes = 
nc.getSource().hasDamOnPathDownTo(source);
+                                       if (bcRes != NOT_FOUND) {
+                                               // broadcast inputs are always 
dams
+                                               return FOUND_SOURCE_AND_DAM;
+                                       }
+                               }
+                               return NOT_FOUND;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
new file mode 100644
index 0000000..d146c83
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+/**
+ * A common interface for compiled Flink plans for both batch and streaming
+ * processing programs.
+ * 
+ */
+public interface FlinkPlan {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
new file mode 100644
index 0000000..38f76b2
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public interface IterationPlanNode {
+       
+       void acceptForStepFunction(Visitor<PlanNode> visitor);
+       
+       IterationNode getIterationNode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
new file mode 100644
index 0000000..3650eea
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.IterableIterator;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A union operation over multiple inputs (2 or more).
+ */
+public class NAryUnionPlanNode extends PlanNode {
+       
+       private final List<Channel> inputs;
+       
+       /**
+        * @param template
+        */
+       public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> 
inputs, GlobalProperties gProps,
+                       Costs cumulativeCosts)
+       {
+               super(template, "Union", DriverStrategy.NONE);
+               
+               this.inputs = inputs;
+               this.globalProps = gProps;
+               this.localProps = new LocalProperties();
+               this.nodeCosts = new Costs();
+               this.cumulativeCosts = cumulativeCosts;
+       }
+
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               visitor.preVisit(this);
+               for (Channel c : this.inputs) {
+                       c.getSource().accept(visitor);
+               }
+               visitor.postVisit(this);
+       }
+       
+       public List<Channel> getListOfInputs() {
+               return this.inputs;
+       }
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Collections.unmodifiableList(this.inputs);
+       }
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               final Iterator<Channel> channels = this.inputs.iterator();
+               return new IterableIterator<PlanNode>() {
+
+                       @Override
+                       public boolean hasNext() {
+                               return channels.hasNext();
+                       }
+
+                       @Override
+                       public PlanNode next() {
+                               return channels.next().getSource();
+                       }
+
+                       @Override
+                       public void remove() {
+                               throw new UnsupportedOperationException();
+                       }
+                       
+                       @Override
+                       public Iterator<PlanNode> iterator() {
+                               return this;
+                       }
+               };
+       }
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               // this node is used after the plan enumeration. consequently, 
this will never be invoked here
+               throw new UnsupportedOperationException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
new file mode 100644
index 0000000..da97e61
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.TempMode;
+
+public class NamedChannel extends Channel {
+
+       private final String name;
+
+       /**
+        * Initializes NamedChannel.
+        * 
+        * @param sourceNode
+        */
+       public NamedChannel(String name, PlanNode sourceNode) {
+               super(sourceNode);
+               this.name = name;
+       }
+
+       public NamedChannel(String name, PlanNode sourceNode, TempMode 
tempMode) {
+               super(sourceNode, tempMode);
+               this.name = name;
+       }
+
+       public String getName() {
+               return this.name;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
new file mode 100644
index 0000000..d56be87
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.Collection;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.util.Visitable;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
+ * and {@link Channel}s that describe exactly how the program should be 
executed.
+ * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), 
all
+ * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
+ * and the data exchange modes (batched, pipelined).
+ */
+public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
+       
+       /** The data sources in the plan. */
+       private final Collection<SourcePlanNode> dataSources;
+
+       /** The data sinks in the plan. */
+       private final Collection<SinkPlanNode> dataSinks;
+
+       /** All nodes in the optimizer plan. */
+       private final Collection<PlanNode> allNodes;
+       
+       /** The original program. */
+       private final Plan originalProgram;
+
+       /** Name of the job */
+       private final String jobName;
+
+       /**
+        * Creates a new instance of this optimizer plan container. The plan is 
given and fully
+        * described by the data sources, sinks and the collection of all nodes.
+        * 
+        * @param sources The data sources.
+        * @param sinks The data sinks.
+        * @param allNodes A collection containing all nodes in the plan.
+        * @param jobName The name of the program
+        */
+       public OptimizedPlan(Collection<SourcePlanNode> sources, 
Collection<SinkPlanNode> sinks,
+                       Collection<PlanNode> allNodes, String jobName, Plan 
programPlan)
+       {
+               this.dataSources = sources;
+               this.dataSinks = sinks;
+               this.allNodes = allNodes;
+               this.jobName = jobName;
+               this.originalProgram = programPlan;
+       }
+
+       /**
+        * Gets the data sources from this OptimizedPlan.
+        * 
+        * @return The data sources.
+        */
+       public Collection<SourcePlanNode> getDataSources() {
+               return dataSources;
+       }
+
+       /**
+        * Gets the data sinks from this OptimizedPlan.
+        * 
+        * @return The data sinks.
+        */
+       public Collection<SinkPlanNode> getDataSinks() {
+               return dataSinks;
+       }
+
+       /**
+        * Gets all the nodes from this OptimizedPlan.
+        * 
+        * @return All nodes.
+        */
+       public Collection<PlanNode> getAllNodes() {
+               return allNodes;
+       }
+
+       /**
+        * Returns the name of the program.
+        * 
+        * @return The name of the program.
+        */
+       public String getJobName() {
+               return this.jobName;
+       }
+       
+       /**
+        * Gets the original program plan from which this optimized plan was 
created.
+        * 
+        * @return The original program plan.
+        */
+       public Plan getOriginalPactPlan() {
+               return this.originalProgram;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Applies the given visitor top down to all nodes, starting at the 
sinks.
+        * 
+        * @param visitor
+        *        The visitor to apply to the nodes in this plan.
+        * @see 
org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
+        */
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               for (SinkPlanNode node : this.dataSinks) {
+                       node.accept(visitor);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
new file mode 100644
index 0000000..6f634fb
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.plandump.DumpableNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.util.Visitable;
+
+/**
+ * The representation of a data exchange between to operators. The data 
exchange can realize a shipping strategy, 
+ * which established global properties, and a local strategy, which 
establishes local properties.
+ * <p>
+ * Because we currently deal only with plans where the operator order is 
fixed, many properties are equal
+ * among candidates and are determined prior to the enumeration (such as for 
example constant/dynamic path membership).
+ * Hence, many methods will delegate to the {@code OptimizerNode} that 
represents the node this candidate was
+ * created for.
+ */
+public abstract class PlanNode implements Visitable<PlanNode>, 
DumpableNode<PlanNode> {
+       
+       protected final OptimizerNode template;
+       
+       protected final List<Channel> outChannels;
+       
+       private List<NamedChannel> broadcastInputs;
+       
+       private final String nodeName; 
+       
+       private DriverStrategy driverStrategy;  // The local strategy (sorting 
/ hashing, ...)
+       
+       protected LocalProperties localProps;                   // local 
properties of the data produced by this node
+
+       protected GlobalProperties globalProps;                 // global 
properties of the data produced by this node
+       
+       protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan 
alternative chosen at a branch point
+       
+       protected Costs nodeCosts;                                              
// the costs incurred by this node
+
+       protected Costs cumulativeCosts;                                        
// the cumulative costs of all operators in the sub-tree
+       
+       private double relativeMemoryPerSubTask;                                
        // the amount of memory dedicated to each task, in bytes
+       
+       private int parallelism;
+       
+       private boolean pFlag;                                                  
// flag for the internal pruning algorithm
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public PlanNode(OptimizerNode template, String nodeName, DriverStrategy 
strategy) {
+               this.outChannels = new ArrayList<Channel>(2);
+               this.broadcastInputs = new ArrayList<NamedChannel>();
+               this.template = template;
+               this.nodeName = nodeName;
+               this.driverStrategy = strategy;
+               
+               this.parallelism = template.getParallelism();
+
+               // check, if there is branch at this node. if yes, this 
candidate must be associated with
+               // the branching template node.
+               if (template.isBranching()) {
+                       this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>(6);
+                       this.branchPlan.put(template, this);
+               }
+       }
+       
+       protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) {
+               mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan);
+       }
+       
+       protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> 
branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) {
+               // merge the branchPlan maps according the template's 
uncloseBranchesStack
+               if (this.template.hasUnclosedBranches()) {
+                       if (this.branchPlan == null) {
+                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>(8);
+                       }
+       
+                       for (UnclosedBranchDescriptor uc : 
this.template.getOpenBranches()) {
+                               OptimizerNode brancher = uc.getBranchingNode();
+                               PlanNode selectedCandidate = null;
+       
+                               if (branchPlan1 != null) {
+                                       // predecessor 1 has branching 
children, see if it got the branch we are looking for
+                                       selectedCandidate = 
branchPlan1.get(brancher);
+                               }
+                               
+                               if (selectedCandidate == null && branchPlan2 != 
null) {
+                                       // predecessor 2 has branching 
children, see if it got the branch we are looking for
+                                       selectedCandidate = 
branchPlan2.get(brancher);
+                               }
+                               
+                               // it may be that the branch candidate is only 
found once the broadcast variables are set
+                               if (selectedCandidate != null) {
+                                       this.branchPlan.put(brancher, 
selectedCandidate);
+                               }
+                       }
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                           Accessors
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the node from the optimizer DAG for which this plan candidate 
node was created.
+        * 
+        * @return The optimizer's DAG node.
+        */
+       public OptimizerNode getOriginalOptimizerNode() {
+               return this.template;
+       }
+       
+       /**
+        * Gets the program operator that this node represents in the plan.
+        * 
+        * @return The program operator this node represents in the plan.
+        */
+       public Operator<?> getProgramOperator() {
+               return this.template.getOperator();
+       }
+       
+       /**
+        * Gets the name of the plan node.
+        * 
+        * @return The name of the plan node.
+        */
+       public String getNodeName() {
+               return this.nodeName;
+       }
+       
+       public int getMemoryConsumerWeight() {
+               return this.driverStrategy.isMaterializing() ? 1 : 0;
+       }
+       
+       /**
+        * Gets the memory dedicated to each sub-task for this node.
+        * 
+        * @return The memory per task, in bytes.
+        */
+       public double getRelativeMemoryPerSubTask() {
+               return this.relativeMemoryPerSubTask;
+       }
+
+       /**
+        * Sets the memory dedicated to each task for this node.
+        * 
+        * @param relativeMemoryPerSubtask The relative memory per sub-task
+        */
+       public void setRelativeMemoryPerSubtask(double 
relativeMemoryPerSubtask) {
+               this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
+       }
+       
+       /**
+        * Gets the driver strategy from this node. This determines for example 
for a <i>match</i> Pact whether
+        * to use a merge or a hybrid hash strategy.
+        * 
+        * @return The driver strategy.
+        */
+       public DriverStrategy getDriverStrategy() {
+               return this.driverStrategy;
+       }
+       
+       /**
+        * Sets the driver strategy for this node. Usually should not be 
changed.
+        * 
+        * @param newDriverStrategy The driver strategy.
+        */
+       public void setDriverStrategy(DriverStrategy newDriverStrategy) {
+               this.driverStrategy = newDriverStrategy;
+       }
+       
+       public void initProperties(GlobalProperties globals, LocalProperties 
locals) {
+               if (this.globalProps != null || this.localProps != null) {
+                       throw new IllegalStateException();
+               }
+               this.globalProps = globals;
+               this.localProps = locals;
+       }
+       
+       /**
+        * Gets the local properties from this PlanNode.
+        *
+        * @return The local properties.
+        */
+       public LocalProperties getLocalProperties() {
+               return this.localProps;
+       }
+       
+       /**
+        * Gets the global properties from this PlanNode.
+        *
+        * @return The global properties.
+        */
+       public GlobalProperties getGlobalProperties() {
+               return this.globalProps;
+       }
+       
+       /**
+        * Gets the costs incurred by this node. The costs reflect also the 
costs incurred by the shipping strategies
+        * of the incoming connections.
+        * 
+        * @return The node-costs, or null, if not yet set.
+        */
+       public Costs getNodeCosts() {
+               return this.nodeCosts;
+       }
+
+       /**
+        * Gets the cumulative costs of this nose. The cumulative costs are the 
sum of the costs
+        * of this node and of all nodes in the subtree below this node.
+        * 
+        * @return The cumulative costs, or null, if not yet set.
+        */
+       public Costs getCumulativeCosts() {
+               return this.cumulativeCosts;
+       }
+
+       public Costs getCumulativeCostsShare() {
+               if (this.cumulativeCosts == null) {
+                       return null;
+               } else {
+                       Costs result = cumulativeCosts.clone();
+                       if (this.template.getOutgoingConnections() != null) {
+                               int outDegree = 
this.template.getOutgoingConnections().size();
+                               if (outDegree > 0) {
+                                       result.divideBy(outDegree);
+                               }
+                       }
+
+                       return result;
+               }
+       }
+
+       
+       /**
+        * Sets the basic cost for this node to the given value, and sets the 
cumulative costs
+        * to those costs plus the cost shares of all inputs (regular and 
broadcast).
+        * 
+        * @param nodeCosts      The already knows costs for this node
+        *                                              (this cost a produces 
by a concrete {@code OptimizerNode} subclass.
+        */
+       public void setCosts(Costs nodeCosts) {
+               // set the node costs
+               this.nodeCosts = nodeCosts;
+               
+               // the cumulative costs are the node costs plus the costs of 
all inputs
+               this.cumulativeCosts = nodeCosts.clone();
+               
+               // add all the normal inputs
+               for (PlanNode pred : getPredecessors()) {
+                       
+                       Costs parentCosts = pred.getCumulativeCostsShare();
+                       if (parentCosts != null) {
+                               this.cumulativeCosts.addCosts(parentCosts);
+                       } else {
+                               throw new CompilerException("Trying to set the 
costs of an operator before the predecessor costs are computed.");
+                       }
+               }
+               
+               // add all broadcast variable inputs
+               if (this.broadcastInputs != null) {
+                       for (NamedChannel nc : this.broadcastInputs) {
+                               Costs bcInputCost = 
nc.getSource().getCumulativeCostsShare();
+                               if (bcInputCost != null) {
+                                       
this.cumulativeCosts.addCosts(bcInputCost);
+                               } else {
+                                       throw new CompilerException("Trying to 
set the costs of an operator before the broadcast input costs are computed.");
+                               }
+                       }
+               }
+       }
+       
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+       }
+       
+       public int getParallelism() {
+               return this.parallelism;
+       }
+       
+       public long getGuaranteedAvailableMemory() {
+               return this.template.getMinimalMemoryAcrossAllSubTasks();
+       }
+
+       public Map<OptimizerNode, PlanNode> getBranchPlan() {
+               return branchPlan;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                               Input, Predecessors, Successors
+       // 
--------------------------------------------------------------------------------------------
+       
+       public abstract Iterable<Channel> getInputs();
+       
+       @Override
+       public abstract Iterable<PlanNode> getPredecessors();
+       
+       /**
+        * Sets a list of all broadcast inputs attached to this node.
+        */
+       public void setBroadcastInputs(List<NamedChannel> broadcastInputs) {
+               if (broadcastInputs != null) {
+                       this.broadcastInputs = broadcastInputs;
+                       
+                       // update the branch map
+                       for (NamedChannel nc : broadcastInputs) {
+                               PlanNode source = nc.getSource();
+                               
+                               mergeBranchPlanMaps(branchPlan, 
source.branchPlan);
+                       }
+               }
+               
+               // do a sanity check that if we are branching, we have now 
candidates for each branch point
+               if (this.template.hasUnclosedBranches()) {
+                       if (this.branchPlan == null) {
+                               throw new CompilerException("Branching and 
rejoining logic did not find a candidate for the branching point.");
+                       }
+       
+                       for (UnclosedBranchDescriptor uc : 
this.template.getOpenBranches()) {
+                               OptimizerNode brancher = uc.getBranchingNode();
+                               if (this.branchPlan.get(brancher) == null) {
+                                       throw new CompilerException("Branching 
and rejoining logic did not find a candidate for the branching point.");
+                               }
+                       }
+               }
+       }
+       
+       /**
+        * Gets a list of all broadcast inputs attached to this node.
+        */
+       public List<NamedChannel> getBroadcastInputs() {
+               return this.broadcastInputs;
+       }
+       
+       /**
+        * Adds a channel to a successor node to this node.
+        * 
+        * @param channel The channel to the successor.
+        */
+       public void addOutgoingChannel(Channel channel) {
+               this.outChannels.add(channel);
+       }
+       
+       /**
+        * Gets a list of all outgoing channels leading to successors.
+        * 
+        * @return A list of all channels leading to successors.
+        */
+       public List<Channel> getOutgoingChannels() {
+               return this.outChannels;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                                Miscellaneous
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void updatePropertiesWithUniqueSets(Set<FieldSet> 
uniqueFieldCombinations) {
+               if (uniqueFieldCombinations == null || 
uniqueFieldCombinations.isEmpty()) {
+                       return;
+               }
+               for (FieldSet fields : uniqueFieldCombinations) {
+                       this.globalProps.addUniqueFieldCombination(fields);
+                       this.localProps = 
this.localProps.addUniqueFields(fields);
+               }
+       }
+
+       public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) {
+               if (branchPlan == null) {
+                       return null;
+               } else {
+                       return this.branchPlan.get(branchPoint);
+               }
+       }
+       
+       /**
+        * Sets the pruning marker to true.
+        */
+       public void setPruningMarker() {
+               this.pFlag = true;
+       }
+       
+       /**
+        * Checks whether the pruning marker was set.
+        * 
+        * @return True, if the pruning marker was set, false otherwise.
+        */
+       public boolean isPruneMarkerSet() {
+               return this.pFlag;
+       }
+       
+       public boolean isOnDynamicPath() {
+               return this.template.isOnDynamicPath();
+       }
+       
+       public int getCostWeight() {
+               return this.template.getCostWeight();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Checks whether this node has a dam on the way down to the given 
source node. This method
+        * returns either that (a) the source node is not found as a 
(transitive) child of this node,
+        * (b) the node is found, but no dam is on the path, or (c) the node is 
found and a dam is on
+        * the path.
+        * 
+        * @param source The node on the path to which the dam is sought.
+        * @return The result whether the node is found and whether a dam is on 
the path.
+        */
+       public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
+       
+       public FeedbackPropertiesMeetRequirementsReport 
checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties 
feedbackGlobal, LocalProperties feedbackLocal) {
+               if (this == partialSolution) {
+                       return FeedbackPropertiesMeetRequirementsReport.PENDING;
+               }
+               
+               boolean found = false;
+               boolean allMet = true;
+               boolean allLocallyMet = true;
+               
+               for (Channel input : getInputs()) {
+                       FeedbackPropertiesMeetRequirementsReport inputState = 
input.getSource().checkPartialSolutionPropertiesMet(partialSolution, 
feedbackGlobal, feedbackLocal);
+                       
+                       if (inputState == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+                               continue;
+                       }
+                       else if (inputState == 
FeedbackPropertiesMeetRequirementsReport.MET) {
+                               found = true;
+                               continue;
+                       }
+                       else if (inputState == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+                               return 
FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+                       }
+                       else {
+                               found = true;
+                               
+                               // the partial solution was on the path here. 
check whether the channel requires
+                               // certain properties that are met, or whether 
the channel introduces new properties
+                               
+                               // if the plan introduces new global 
properties, then we can stop looking whether
+                               // the feedback properties are sufficient to 
meet the requirements
+                               if (input.getShipStrategy() != 
ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
+                                       continue;
+                               }
+                               
+                               // first check whether this channel requires 
something that is not met
+                               if (input.getRequiredGlobalProps() != null && 
!input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
+                                       return 
FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+                               }
+                               
+                               // in general, not everything is met here 
already
+                               allMet = false;
+                               
+                               // if the plan introduces new local properties, 
we can stop checking for matching local properties
+                               if (inputState != 
FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
+                                       
+                                       if (input.getLocalStrategy() == 
LocalStrategy.NONE) {
+                                               
+                                               if 
(input.getRequiredLocalProps() != null && 
!input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
+                                                       return 
FeedbackPropertiesMeetRequirementsReport.NOT_MET;
+                                               }
+                                               
+                                               allLocallyMet = false;
+                                       }
+                               }
+                       }
+               }
+               
+               if (!found) {
+                       return 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
+               } else if (allMet) {
+                       return FeedbackPropertiesMeetRequirementsReport.MET;
+               } else if (allLocallyMet) {
+                       return 
FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
+               } else {
+                       return FeedbackPropertiesMeetRequirementsReport.PENDING;
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public String toString() {
+               return this.template.getName() + " \"" + 
getProgramOperator().getName() + "\" : " + this.driverStrategy +
+                               " [[ " + this.globalProps + " ]] [[ " + 
this.localProps + " ]]";
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public OptimizerNode getOptimizerNode() {
+               return this.template;
+       }
+
+       @Override
+       public PlanNode getPlanNode() {
+               return this;
+       }
+
+       @Override
+       public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
+               List<DumpableConnection<PlanNode>> allInputs = new 
ArrayList<DumpableConnection<PlanNode>>();
+               
+               for (Channel c : getInputs()) {
+                       allInputs.add(c);
+               }
+               
+               for (NamedChannel c : getBroadcastInputs()) {
+                       allInputs.add(c);
+               }
+               
+               return allInputs;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static enum SourceAndDamReport {
+               NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
+       }
+       
+       
+       
+       public static enum FeedbackPropertiesMeetRequirementsReport {
+               /** Indicates that the path is irrelevant */
+               NO_PARTIAL_SOLUTION,
+               
+               /** Indicates that the question whether the properties are met 
has been determined pending
+                * dependent on global and local properties */
+               PENDING,
+               
+               /** Indicates that the question whether the properties are met 
has been determined pending
+                * dependent on global properties only */
+               PENDING_LOCAL_MET,
+               
+               /** Indicates that the question whether the properties are met 
has been determined true */
+               MET,
+               
+               /** Indicates that the question whether the properties are met 
has been determined false */
+               NOT_MET;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
new file mode 100644
index 0000000..b928be7
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ * 
+ */
+public class SingleInputPlanNode extends PlanNode {
+       
+       protected final Channel input;
+       
+       protected final FieldList[] driverKeys;
+       
+       protected final boolean[][] driverSortOrders;
+       
+       private TypeComparatorFactory<?>[] comparators;
+       
+       public Object postPassHelper;
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       public SingleInputPlanNode(OptimizerNode template, String nodeName, 
Channel input, DriverStrategy driverStrategy) {
+               this(template, nodeName, input, driverStrategy, null, null);
+       }
+       
+       public SingleInputPlanNode(OptimizerNode template, String nodeName, 
Channel input, 
+                       DriverStrategy driverStrategy, FieldList 
driverKeyFields)
+       {
+               this(template, nodeName, input, driverStrategy, 
driverKeyFields, getTrueArray(driverKeyFields.size()));
+       }
+       
+       public SingleInputPlanNode(OptimizerNode template, String nodeName, 
Channel input, 
+                       DriverStrategy driverStrategy, FieldList 
driverKeyFields, boolean[] driverSortOrders)
+       {
+               super(template, nodeName, driverStrategy);
+               this.input = input;
+               
+               this.comparators = new 
TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
+               this.driverKeys = new 
FieldList[driverStrategy.getNumRequiredComparators()];
+               this.driverSortOrders = new 
boolean[driverStrategy.getNumRequiredComparators()][];
+               
+               if(driverStrategy.getNumRequiredComparators() > 0) {
+                       this.driverKeys[0] = driverKeyFields;
+                       this.driverSortOrders[0] = driverSortOrders;
+               }
+               
+               if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) 
{
+                       this.input.setReplicationFactor(getParallelism());
+               }
+               
+               final PlanNode predNode = input.getSource();
+               
+               if (predNode.branchPlan != null && 
!predNode.branchPlan.isEmpty()) {
+                       
+                       if (this.branchPlan == null) {
+                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
+                       }
+                       this.branchPlan.putAll(predNode.branchPlan);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public SingleInputNode getSingleInputNode() {
+               if (this.template instanceof SingleInputNode) {
+                       return (SingleInputNode) this.template;
+               } else {
+                       throw new RuntimeException();
+               }
+       }
+       
+       /**
+        * Gets the input channel to this node.
+        * 
+        * @return The input channel to this node.
+        */
+       public Channel getInput() {
+               return this.input;
+       }
+       
+       /**
+        * Gets the predecessor of this node, i.e. the source of the input 
channel.
+        * 
+        * @return The predecessor of this node.
+        */
+       public PlanNode getPredecessor() {
+               return this.input.getSource();
+       }
+       
+       /**
+        * Sets the key field indexes for the specified driver comparator.
+        * 
+        * @param keys The key field indexes for the specified driver 
comparator.
+        * @param id The ID of the driver comparator.
+        */
+       public void setDriverKeyInfo(FieldList keys, int id) {
+               this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
+       }
+       
+       /**
+        * Sets the key field information for the specified driver comparator.
+        * 
+        * @param keys The key field indexes for the specified driver 
comparator.
+        * @param sortOrder The key sort order for the specified driver 
comparator.
+        * @param id The ID of the driver comparator.
+        */
+       public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int 
id) {
+               if(id < 0 || id >= driverKeys.length) {
+                       throw new CompilerException("Invalid id for driver key 
information. DriverStrategy requires only "
+                                                                               
        +super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
+               }
+               this.driverKeys[id] = keys;
+               this.driverSortOrders[id] = sortOrder;
+       }
+       
+       /**
+        * Gets the key field indexes for the specified driver comparator.
+        * 
+        * @param id The id of the driver comparator for which the key field 
indexes are requested.
+        * @return The key field indexes of the specified driver comparator.
+        */
+       public FieldList getKeys(int id) {
+               return this.driverKeys[id];
+       }
+       
+       /**
+        * Gets the sort order for the specified driver comparator.
+        * 
+        * @param id The id of the driver comparator for which the sort order 
is requested.
+        * @return The sort order of the specified driver comparator.
+        */
+       public boolean[] getSortOrders(int id) {
+               return driverSortOrders[id];
+       }
+       
+       /**
+        * Gets the specified comparator from this PlanNode.
+        * 
+        * @param id The ID of the requested comparator.
+        *
+        * @return The specified comparator.
+        */
+       public TypeComparatorFactory<?> getComparator(int id) {
+               return comparators[id];
+       }
+       
+       /**
+        * Sets the specified comparator for this PlanNode.
+        *
+        * @param comparator The comparator to set.
+        * @param id The ID of the comparator to set.
+        */
+       public void setComparator(TypeComparatorFactory<?> comparator, int id) {
+               this.comparators[id] = comparator;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       this.input.getSource().accept(visitor);
+                       
+                       for (Channel broadcastInput : getBroadcastInputs()) {
+                               broadcastInput.getSource().accept(visitor);
+                       }
+                       
+                       visitor.postVisit(this);
+               }
+       }
+
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               if (getBroadcastInputs() == null || 
getBroadcastInputs().isEmpty()) {
+                       return Collections.singleton(this.input.getSource());
+               }
+               else {
+                       List<PlanNode> preds = new ArrayList<PlanNode>();
+                       preds.add(input.getSource());
+                       
+                       for (Channel c : getBroadcastInputs()) {
+                               preds.add(c.getSource());
+                       }
+                       
+                       return preds;
+               }
+       }
+
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Collections.singleton(this.input);
+       }
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               }
+               SourceAndDamReport res = 
this.input.getSource().hasDamOnPathDownTo(source);
+               if (res == FOUND_SOURCE_AND_DAM) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               else if (res == FOUND_SOURCE) {
+                       return (this.input.getLocalStrategy().dams() || 
this.input.getTempMode().breaksPipeline() ||
+                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) ?
+                               FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
+               }
+               else {
+                       // NOT_FOUND
+                       // check the broadcast inputs
+                       
+                       for (NamedChannel nc : getBroadcastInputs()) {
+                               SourceAndDamReport bcRes = 
nc.getSource().hasDamOnPathDownTo(source);
+                               if (bcRes != NOT_FOUND) {
+                                       // broadcast inputs are always dams
+                                       return FOUND_SOURCE_AND_DAM;
+                               }
+                       }
+                       return NOT_FOUND;
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       protected static boolean[] getTrueArray(int length) {
+               final boolean[] a = new boolean[length];
+               for (int i = 0; i < length; i++) {
+                       a[i] = true;
+               }
+               return a;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
new file mode 100644
index 0000000..451484d
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.util.List;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+/**
+ *
+ */
+public class SinkJoinerPlanNode extends DualInputPlanNode {
+       
+       public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel 
input2) {
+               super(template, "", input1, input2, 
DriverStrategy.BINARY_NO_OP);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void setCosts(Costs nodeCosts) {
+               // the plan enumeration logic works as for regular 
two-input-operators, which is important
+               // because of the branch handling logic. it does pick 
redistributing network channels
+               // between the sink and the sink joiner, because sinks joiner 
has a different DOP than the sink.
+               // we discard any cost and simply use the sum of the costs from 
the two children.
+               
+               Costs totalCosts = 
getInput1().getSource().getCumulativeCosts().clone();
+               
totalCosts.addCosts(getInput2().getSource().getCumulativeCosts());
+               super.setCosts(totalCosts);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void getDataSinks(List<SinkPlanNode> sinks) {
+               final PlanNode in1 = this.input1.getSource();
+               final PlanNode in2 = this.input2.getSource();
+               
+               if (in1 instanceof SinkPlanNode) {
+                       sinks.add((SinkPlanNode) in1);
+               } else if (in1 instanceof SinkJoinerPlanNode) {
+                       ((SinkJoinerPlanNode) in1).getDataSinks(sinks);
+               } else {
+                       throw new CompilerException("Illegal child node for a 
sink joiner utility node: Neither Sink nor Sink Joiner");
+               }
+               
+               if (in2 instanceof SinkPlanNode) {
+                       sinks.add((SinkPlanNode) in2);
+               } else if (in2 instanceof SinkJoinerPlanNode) {
+                       ((SinkJoinerPlanNode) in2).getDataSinks(sinks);
+               } else {
+                       throw new CompilerException("Illegal child node for a 
sink joiner utility node: Neither Sink nor Sink Joiner");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
new file mode 100644
index 0000000..656e67f
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * Plan candidate node for data flow sinks.
+ */
+public class SinkPlanNode extends SingleInputPlanNode
+{
+       /**
+        * Constructs a new sink candidate node that uses <i>NONE</i> as its 
local strategy. Note that
+        * local sorting and range partitioning are handled by the incoming 
channel already.
+        * 
+        * @param template The template optimizer node that this candidate is 
created for.
+        */
+       public SinkPlanNode(DataSinkNode template, String nodeName, Channel 
input) {
+               super(template, nodeName, input, DriverStrategy.NONE);
+               
+               this.globalProps = input.getGlobalProperties().clone();
+               this.localProps = input.getLocalProperties().clone();
+       }
+       
+       public DataSinkNode getSinkNode() {
+               if (this.template instanceof DataSinkNode) {
+                       return (DataSinkNode) this.template;
+               } else {
+                       throw new RuntimeException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
new file mode 100644
index 0000000..63093dd
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SolutionSetNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for partial solution of a bulk iteration.
+ */
+public class SolutionSetPlanNode extends PlanNode {
+       
+       private static final Costs NO_COSTS = new Costs();
+       
+       private WorksetIterationPlanNode containingIterationNode;
+       
+       private final Channel initialInput;
+       
+       public Object postPassHelper;
+       
+       
+       public SolutionSetPlanNode(SolutionSetNode template, String nodeName,
+                       GlobalProperties gProps, LocalProperties lProps,
+                       Channel initialInput)
+       {
+               super(template, nodeName, DriverStrategy.NONE);
+               
+               this.globalProps = gProps;
+               this.localProps = lProps;
+               this.initialInput = initialInput;
+               
+               // the node incurs no cost
+               this.nodeCosts = NO_COSTS;
+               this.cumulativeCosts = NO_COSTS;
+               
+               if (initialInput.getSource().branchPlan != null && 
initialInput.getSource().branchPlan.size() > 0) {
+                       if (this.branchPlan == null) {
+                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
+                       }
+                       
+                       
this.branchPlan.putAll(initialInput.getSource().branchPlan);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public SolutionSetNode getSolutionSetNode() {
+               return (SolutionSetNode) this.template;
+       }
+       
+       public WorksetIterationPlanNode getContainingIterationNode() {
+               return this.containingIterationNode;
+       }
+       
+       public void setContainingIterationNode(WorksetIterationPlanNode 
containingIterationNode) {
+               this.containingIterationNode = containingIterationNode;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       visitor.postVisit(this);
+               }
+       }
+
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               return Collections.<PlanNode>emptyList();
+       }
+
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Collections.<Channel>emptyList();
+       }
+
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE_AND_DAM;
+               }
+               
+               SourceAndDamReport res = 
this.initialInput.getSource().hasDamOnPathDownTo(source);
+               if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) {
+                       return FOUND_SOURCE_AND_DAM;
+               } else {
+                       return NOT_FOUND;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
new file mode 100644
index 0000000..11b7cc9
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.Collections;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Plan candidate node for data flow sources that have no input and no special 
strategies.
+ */
+public class SourcePlanNode extends PlanNode {
+       
+       private TypeSerializerFactory<?> serializer;
+       
+       /**
+        * Constructs a new source candidate node that uses <i>NONE</i> as its 
local strategy.
+        * 
+        * @param template The template optimizer node that this candidate is 
created for.
+        */
+       public SourcePlanNode(DataSourceNode template, String nodeName) {
+               this(template, nodeName, new GlobalProperties(), new 
LocalProperties());
+       }
+
+       public SourcePlanNode(DataSourceNode template, String nodeName, 
GlobalProperties gprops, LocalProperties lprops) {
+               super(template, nodeName, DriverStrategy.NONE);
+
+               this.globalProps = gprops;
+               this.localProps = lprops;
+               updatePropertiesWithUniqueSets(template.getUniqueFields());
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public DataSourceNode getDataSourceNode() {
+               return (DataSourceNode) this.template;
+       }
+       
+       /**
+        * Gets the serializer from this PlanNode.
+        *
+        * @return The serializer.
+        */
+       public TypeSerializerFactory<?> getSerializer() {
+               return serializer;
+       }
+       
+       /**
+        * Sets the serializer for this PlanNode.
+        *
+        * @param serializer The serializer to set.
+        */
+       public void setSerializer(TypeSerializerFactory<?> serializer) {
+               this.serializer = serializer;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+
+       @Override
+       public void accept(Visitor<PlanNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       visitor.postVisit(this);
+               }
+       }
+
+
+       @Override
+       public Iterable<PlanNode> getPredecessors() {
+               return Collections.<PlanNode>emptyList();
+       }
+
+
+       @Override
+       public Iterable<Channel> getInputs() {
+               return Collections.<Channel>emptyList();
+       }
+
+
+       @Override
+       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+               if (source == this) {
+                       return FOUND_SOURCE;
+               } else {
+                       return NOT_FOUND;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
new file mode 100644
index 0000000..880f2e3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Abstract class representing Flink Streaming plans
+ * 
+ */
+public abstract class StreamingPlan implements FlinkPlan {
+
+       public abstract JobGraph getJobGraph();
+
+       public abstract String getStreamingPlanAsJSON();
+
+       public abstract void dumpStreamingPlanAsJSON(File file) throws 
IOException;
+
+}

Reply via email to