http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
new file mode 100644
index 0000000..cc12bb8
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
+import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+import com.google.common.collect.Sets;
+
+/**
+ * A node in the optimizer's program representation for an operation with a 
single input.
+ * 
+ * This class contains all the generic logic for handling branching flows, as 
well as to
+ * enumerate candidate execution plans. The subclasses for specific operators 
simply add logic
+ * for cost estimates and specify possible strategies for their execution.
+ */
+public abstract class SingleInputNode extends OptimizerNode {
+       
+       protected final FieldSet keys;                  // The set of key fields
+       
+       protected DagConnection inConn;                 // the input of the node
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Creates a new node with a single input for the optimizer plan.
+        * 
+        * @param programOperator The PACT that the node represents.
+        */
+       protected SingleInputNode(SingleInputOperator<?, ?, ?> programOperator) 
{
+               super(programOperator);
+               
+               int[] k = programOperator.getKeyColumns(0);
+               this.keys = k == null || k.length == 0 ? null : new FieldSet(k);
+       }
+       
+       protected SingleInputNode(FieldSet keys) {
+               super(NoOpUnaryUdfOp.INSTANCE);
+               this.keys = keys;
+       }
+       
+       protected SingleInputNode() {
+               super(NoOpUnaryUdfOp.INSTANCE);
+               this.keys = null;
+       }
+       
+       protected SingleInputNode(SingleInputNode toCopy) {
+               super(toCopy);
+               
+               this.keys = toCopy.keys;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public SingleInputOperator<?, ?, ?> getOperator() {
+               return (SingleInputOperator<?, ?, ?>) super.getOperator();
+       }
+       
+       /**
+        * Gets the input of this operator.
+        * 
+        * @return The input.
+        */
+       public DagConnection getIncomingConnection() {
+               return this.inConn;
+       }
+
+       /**
+        * Sets the connection through which this node receives its input.
+        * 
+        * @param inConn The input connection to set.
+        */
+       public void setIncomingConnection(DagConnection inConn) {
+               this.inConn = inConn;
+       }
+       
+       /**
+        * Gets the predecessor of this node.
+        * 
+        * @return The predecessor of this node. 
+        */
+       public OptimizerNode getPredecessorNode() {
+               if (this.inConn != null) {
+                       return this.inConn.getSource();
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       public List<DagConnection> getIncomingConnections() {
+               return Collections.singletonList(this.inConn);
+       }
+       
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return getOperator().getSemanticProperties();
+       }
+       
+
+       @Override
+       public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultExchangeMode)
+                       throws CompilerException
+       {
+               // see if an internal hint dictates the strategy to use
+               final Configuration conf = getOperator().getParameters();
+               final String shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
+               final ShipStrategyType preSet;
+               
+               if (shipStrategy != null) {
+                       if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
+                               preSet = ShipStrategyType.PARTITION_HASH;
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) 
{
+                               preSet = ShipStrategyType.PARTITION_RANGE;
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
+                               preSet = ShipStrategyType.FORWARD;
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
+                               preSet = ShipStrategyType.PARTITION_RANDOM;
+                       } else {
+                               throw new CompilerException("Unrecognized ship 
strategy hint: " + shipStrategy);
+                       }
+               } else {
+                       preSet = null;
+               }
+               
+               // get the predecessor node
+               Operator<?> children = ((SingleInputOperator<?, ?, ?>) 
getOperator()).getInput();
+               
+               OptimizerNode pred;
+               DagConnection conn;
+               if (children == null) {
+                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input.");
+               } else {
+                       pred = contractToNode.get(children);
+                       conn = new DagConnection(pred, this, 
defaultExchangeMode);
+                       if (preSet != null) {
+                               conn.setShipStrategy(preSet);
+                       }
+               }
+               
+               // create the connection and add it
+               setIncomingConnection(conn);
+               pred.addOutgoingConnection(conn);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                             Properties and Optimization
+       // 
--------------------------------------------------------------------------------------------
+       
+       protected abstract List<OperatorDescriptorSingle> 
getPossibleProperties();
+       
+       @Override
+       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
+               // get what we inherit and what is preserved by our user code 
+               final InterestingProperties props = 
getInterestingProperties().filterByCodeAnnotations(this, 0);
+               
+               // add all properties relevant to this node
+               for (OperatorDescriptorSingle dps : getPossibleProperties()) {
+                       for (RequestedGlobalProperties gp : 
dps.getPossibleGlobalProperties()) {
+                               
+                               if (gp.getPartitioning().isPartitionedOnKey()) {
+                                       // make sure that among the same 
partitioning types, we do not push anything down that has fewer key fields
+                                       
+                                       for (RequestedGlobalProperties 
contained : props.getGlobalProperties()) {
+                                               if (contained.getPartitioning() 
== gp.getPartitioning() && 
gp.getPartitionedFields().isValidSubset(contained.getPartitionedFields())) {
+                                                       
props.getGlobalProperties().remove(contained);
+                                                       break;
+                                               }
+                                       }
+                               }
+                               
+                               props.addGlobalProperties(gp);
+                       }
+                       
+                       for (RequestedLocalProperties lp : 
dps.getPossibleLocalProperties()) {
+                               props.addLocalProperties(lp);
+                       }
+               }
+               this.inConn.setInterestingProperties(props);
+               
+               for (DagConnection conn : getBroadcastConnections()) {
+                       conn.setInterestingProperties(new 
InterestingProperties());
+               }
+       }
+       
+
+       @Override
+       public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+               // check if we have a cached version
+               if (this.cachedPlans != null) {
+                       return this.cachedPlans;
+               }
+
+               boolean childrenSkippedDueToReplicatedInput = false;
+
+               // calculate alternative sub-plans for predecessor
+               final List<? extends PlanNode> subPlans = 
getPredecessorNode().getAlternativePlans(estimator);
+               final Set<RequestedGlobalProperties> intGlobal = 
this.inConn.getInterestingProperties().getGlobalProperties();
+               
+               // calculate alternative sub-plans for broadcast inputs
+               final List<Set<? extends NamedChannel>> broadcastPlanChannels = 
new ArrayList<Set<? extends NamedChannel>>();
+               List<DagConnection> broadcastConnections = 
getBroadcastConnections();
+               List<String> broadcastConnectionNames = 
getBroadcastConnectionNames();
+
+               for (int i = 0; i < broadcastConnections.size(); i++ ) {
+                       DagConnection broadcastConnection = 
broadcastConnections.get(i);
+                       String broadcastConnectionName = 
broadcastConnectionNames.get(i);
+                       List<PlanNode> broadcastPlanCandidates = 
broadcastConnection.getSource().getAlternativePlans(estimator);
+
+                       // wrap the plan candidates in named channels
+                       HashSet<NamedChannel> broadcastChannels = new 
HashSet<NamedChannel>(broadcastPlanCandidates.size());
+                       for (PlanNode plan: broadcastPlanCandidates) {
+                               NamedChannel c = new 
NamedChannel(broadcastConnectionName, plan);
+                               DataExchangeMode exMode = 
DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
+                                                                               
ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
+                               c.setShipStrategy(ShipStrategyType.BROADCAST, 
exMode);
+                               broadcastChannels.add(c);
+                       }
+                       broadcastPlanChannels.add(broadcastChannels);
+               }
+
+               final RequestedGlobalProperties[] allValidGlobals;
+               {
+                       Set<RequestedGlobalProperties> pairs = new 
HashSet<RequestedGlobalProperties>();
+                       for (OperatorDescriptorSingle ods : 
getPossibleProperties()) {
+                               pairs.addAll(ods.getPossibleGlobalProperties());
+                       }
+                       allValidGlobals = pairs.toArray(new 
RequestedGlobalProperties[pairs.size()]);
+               }
+               final ArrayList<PlanNode> outputPlans = new 
ArrayList<PlanNode>();
+
+               final ExecutionMode executionMode = 
this.inConn.getDataExchangeMode();
+
+               final int dop = getParallelism();
+               final int inDop = getPredecessorNode().getParallelism();
+               final boolean dopChange = inDop != dop;
+
+               final boolean breaksPipeline = this.inConn.isBreakingPipeline();
+
+               // create all candidates
+               for (PlanNode child : subPlans) {
+
+                       if (child.getGlobalProperties().isFullyReplicated()) {
+                               // fully replicated input is always locally 
forwarded if DOP is not changed
+                               if (dopChange) {
+                                       // can not continue with this child
+                                       childrenSkippedDueToReplicatedInput = 
true;
+                                       continue;
+                               } else {
+                                       
this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
+                               }
+                       }
+
+                       if (this.inConn.getShipStrategy() == null) {
+                               // pick the strategy ourselves
+                               for (RequestedGlobalProperties igps: intGlobal) 
{
+                                       final Channel c = new Channel(child, 
this.inConn.getMaterializationMode());
+                                       igps.parameterizeChannel(c, dopChange, 
executionMode, breaksPipeline);
+                                       
+                                       // if the DOP changed, make sure that 
we cancel out properties, unless the
+                                       // ship strategy preserves/establishes 
them even under changing DOPs
+                                       if (dopChange && 
!c.getShipStrategy().isNetworkStrategy()) {
+                                               c.getGlobalProperties().reset();
+                                       }
+                                       
+                                       // check whether we meet any of the 
accepted properties
+                                       // we may remove this check, when we do 
a check to not inherit
+                                       // requested global properties that are 
incompatible with all possible
+                                       // requested properties
+                                       for (RequestedGlobalProperties rgps: 
allValidGlobals) {
+                                               if 
(rgps.isMetBy(c.getGlobalProperties())) {
+                                                       
c.setRequiredGlobalProps(rgps);
+                                                       addLocalCandidates(c, 
broadcastPlanChannels, igps, outputPlans, estimator);
+                                                       break;
+                                               }
+                                       }
+                               }
+                       } else {
+                               // hint fixed the strategy
+                               final Channel c = new Channel(child, 
this.inConn.getMaterializationMode());
+                               final ShipStrategyType shipStrategy = 
this.inConn.getShipStrategy();
+                               final DataExchangeMode exMode = 
DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline);
+
+                               if (this.keys != null) {
+                                       c.setShipStrategy(shipStrategy, 
this.keys.toFieldList(), exMode);
+                               } else {
+                                       c.setShipStrategy(shipStrategy, exMode);
+                               }
+                               
+                               if (dopChange) {
+                                       
c.adjustGlobalPropertiesForFullParallelismChange();
+                               }
+
+                               // check whether we meet any of the accepted 
properties
+                               for (RequestedGlobalProperties rgps: 
allValidGlobals) {
+                                       if 
(rgps.isMetBy(c.getGlobalProperties())) {
+                                               addLocalCandidates(c, 
broadcastPlanChannels, rgps, outputPlans, estimator);
+                                               break;
+                                       }
+                               }
+                       }
+               }
+
+               if(outputPlans.isEmpty()) {
+                       if(childrenSkippedDueToReplicatedInput) {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this + ". Most likely reason: Invalid 
use of replicated input.");
+                       } else {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this + ". Most likely reason: Too 
restrictive plan hints.");
+                       }
+               }
+
+               // cost and prune the plans
+               for (PlanNode node : outputPlans) {
+                       estimator.costOperator(node);
+               }
+               prunePlanAlternatives(outputPlans);
+               outputPlans.trimToSize();
+
+               this.cachedPlans = outputPlans;
+               return outputPlans;
+       }
+       
+       protected void addLocalCandidates(Channel template, List<Set<? extends 
NamedChannel>> broadcastPlanChannels, RequestedGlobalProperties rgps,
+                       List<PlanNode> target, CostEstimator estimator)
+       {
+               for (RequestedLocalProperties ilp : 
this.inConn.getInterestingProperties().getLocalProperties()) {
+                       final Channel in = template.clone();
+                       ilp.parameterizeChannel(in);
+                       
+                       // instantiate a candidate, if the instantiated local 
properties meet one possible local property set
+                       outer:
+                       for (OperatorDescriptorSingle dps: 
getPossibleProperties()) {
+                               for (RequestedLocalProperties ilps : 
dps.getPossibleLocalProperties()) {
+                                       if 
(ilps.isMetBy(in.getLocalProperties())) {
+                                               in.setRequiredLocalProps(ilps);
+                                               instantiateCandidate(dps, in, 
broadcastPlanChannels, target, estimator, rgps, ilp);
+                                               break outer;
+                                       }
+                               }
+                       }
+               }
+       }
+
+       protected void instantiateCandidate(OperatorDescriptorSingle dps, 
Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels,
+                       List<PlanNode> target, CostEstimator estimator, 
RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
+       {
+               final PlanNode inputSource = in.getSource();
+               
+               for (List<NamedChannel> broadcastChannelsCombination: 
Sets.cartesianProduct(broadcastPlanChannels)) {
+                       
+                       boolean validCombination = true;
+                       boolean requiresPipelinebreaker = false;
+                       
+                       // check whether the broadcast inputs use the same plan 
candidate at the branching point
+                       for (int i = 0; i < 
broadcastChannelsCombination.size(); i++) {
+                               NamedChannel nc = 
broadcastChannelsCombination.get(i);
+                               PlanNode bcSource = nc.getSource();
+                               
+                               // check branch compatibility against input
+                               if (!areBranchCompatible(bcSource, 
inputSource)) {
+                                       validCombination = false;
+                                       break;
+                               }
+                               
+                               // check branch compatibility against all other 
broadcast variables
+                               for (int k = 0; k < i; k++) {
+                                       PlanNode otherBcSource = 
broadcastChannelsCombination.get(k).getSource();
+                                       
+                                       if (!areBranchCompatible(bcSource, 
otherBcSource)) {
+                                               validCombination = false;
+                                               break;
+                                       }
+                               }
+                               
+                               // check if there is a common predecessor and 
whether there is a dam on the way to all common predecessors
+                               if (this.hereJoinedBranches != null) {
+                                       for (OptimizerNode brancher : 
this.hereJoinedBranches) {
+                                               PlanNode candAtBrancher = 
in.getSource().getCandidateAtBranchPoint(brancher);
+                                               
+                                               if (candAtBrancher == null) {
+                                                       // closed branch 
between two broadcast variables
+                                                       continue;
+                                               }
+                                               
+                                               SourceAndDamReport res = 
in.getSource().hasDamOnPathDownTo(candAtBrancher);
+                                               if (res == NOT_FOUND) {
+                                                       throw new 
CompilerException("Bug: Tracing dams for deadlock detection is broken.");
+                                               } else if (res == FOUND_SOURCE) 
{
+                                                       requiresPipelinebreaker 
= true;
+                                                       break;
+                                               } else if (res == 
FOUND_SOURCE_AND_DAM) {
+                                                       // good
+                                               } else {
+                                                       throw new 
CompilerException();
+                                               }
+                                       }
+                               }
+                       }
+                       
+                       if (!validCombination) {
+                               continue;
+                       }
+                       
+                       if (requiresPipelinebreaker) {
+                               
in.setTempMode(in.getTempMode().makePipelineBreaker());
+                       }
+                       
+                       final SingleInputPlanNode node = dps.instantiate(in, 
this);
+                       node.setBroadcastInputs(broadcastChannelsCombination);
+                       
+                       // compute how the strategy affects the properties
+                       GlobalProperties gProps = 
in.getGlobalProperties().clone();
+                       LocalProperties lProps = 
in.getLocalProperties().clone();
+                       gProps = dps.computeGlobalProperties(gProps);
+                       lProps = dps.computeLocalProperties(lProps);
+
+                       SemanticProperties props = this.getSemanticProperties();
+                       // filter by the user code field copies
+                       gProps = gProps.filterBySemanticProperties(props, 0);
+                       lProps = lProps.filterBySemanticProperties(props, 0);
+                       
+                       // apply
+                       node.initProperties(gProps, lProps);
+                       node.updatePropertiesWithUniqueSets(getUniqueFields());
+                       target.add(node);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                     Branch Handling
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               addClosedBranches(getPredecessorNode().closedBranchingNodes);
+               List<UnclosedBranchDescriptor> fromInput = 
getPredecessorNode().getBranchesForParent(this.inConn);
+               
+               // handle the data flow branching for the broadcast inputs
+               List<UnclosedBranchDescriptor> result = 
computeUnclosedBranchStackForBroadcastInputs(fromInput);
+               
+               this.openBranches = (result == null || result.isEmpty()) ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                     Miscellaneous
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void accept(Visitor<OptimizerNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       if (getPredecessorNode() != null) {
+                               getPredecessorNode().accept(visitor);
+                       } else {
+                               throw new CompilerException();
+                       }
+                       for (DagConnection connection : 
getBroadcastConnections()) {
+                               connection.getSource().accept(visitor);
+                       }
+                       visitor.postVisit(this);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
new file mode 100644
index 0000000..40725ba
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.operators.UtilSinkJoinOpDescriptor;
+import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
+import org.apache.flink.types.Nothing;
+
+/**
+ * This class represents a utility node that is not part of the actual plan.
+ * It is used for plans with multiple data sinks to transform it into a plan 
with
+ * a single root node. That way, the code that makes sure no costs are 
double-counted and that
+ * candidate selection works correctly with nodes that have multiple outputs 
is transparently reused.
+ */
+public class SinkJoiner extends TwoInputNode {
+       
+       public SinkJoiner(OptimizerNode input1, OptimizerNode input2) {
+               super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
+
+               DagConnection conn1 = new DagConnection(input1, this, null, 
ExecutionMode.PIPELINED);
+               DagConnection conn2 = new DagConnection(input2, this, null, 
ExecutionMode.PIPELINED);
+               
+               this.input1 = conn1;
+               this.input2 = conn2;
+               
+               setDegreeOfParallelism(1);
+       }
+       
+       @Override
+       public String getName() {
+               return "Internal Utility Node";
+       }
+       
+       @Override
+       public List<DagConnection> getOutgoingConnections() {
+               return Collections.emptyList();
+       }
+       
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+               
+               
addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
+               
addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
+               
+               List<UnclosedBranchDescriptor> pred1branches = 
getFirstPredecessorNode().openBranches;
+               List<UnclosedBranchDescriptor> pred2branches = 
getSecondPredecessorNode().openBranches;
+               
+               // if the predecessors do not have branches, then we have 
multiple sinks that do not originate from
+               // a common data flow.
+               if (pred1branches == null || pred1branches.isEmpty()) {
+                       
+                       this.openBranches = (pred2branches == null || 
pred2branches.isEmpty()) ?
+                                       
Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - 
disconnected flow
+                                       pred2branches;
+               }
+               else if (pred2branches == null || pred2branches.isEmpty()) {
+                       this.openBranches = pred1branches;
+               }
+               else {
+                       // copy the lists and merge
+                       List<UnclosedBranchDescriptor> result1 = new 
ArrayList<UnclosedBranchDescriptor>(pred1branches);
+                       List<UnclosedBranchDescriptor> result2 = new 
ArrayList<UnclosedBranchDescriptor>(pred2branches);
+                       
+                       ArrayList<UnclosedBranchDescriptor> result = new 
ArrayList<UnclosedBranchDescriptor>();
+                       mergeLists(result1, result2, result, false);
+                       
+                       this.openBranches = result.isEmpty() ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
+               }
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return Collections.<OperatorDescriptorDual>singletonList(new 
UtilSinkJoinOpDescriptor());
+       }
+
+       @Override
+       public void computeOutputEstimates(DataStatistics statistics) {
+               // nothing to be done here
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // no estimates needed at this point
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
new file mode 100644
index 0000000..1292cf5
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+
+/**
+ * The optimizer's internal representation of the solution set of a workset 
iteration.
+ */
+public class SolutionSetNode extends AbstractPartialSolutionNode {
+       
+       private final WorksetIterationNode iterationNode;
+       
+       
+       public SolutionSetNode(SolutionSetPlaceHolder<?> psph, 
WorksetIterationNode iterationNode) {
+               super(psph);
+               this.iterationNode = iterationNode;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void setCandidateProperties(GlobalProperties gProps, 
LocalProperties lProps, Channel initialInput) {
+               this.cachedPlans = Collections.<PlanNode>singletonList(new 
SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", 
gProps, lProps, initialInput));
+       }
+       
+       public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {
+               if (this.cachedPlans != null) {
+                       return (SolutionSetPlanNode) this.cachedPlans.get(0);
+               } else {
+                       throw new IllegalStateException();
+               }
+       }
+       
+       public WorksetIterationNode getIterationNode() {
+               return this.iterationNode;
+       }
+       
+       @Override
+       public void computeOutputEstimates(DataStatistics statistics) {
+               
copyEstimates(this.iterationNode.getInitialSolutionSetPredecessorNode());
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Gets the contract object for this data source node.
+        * 
+        * @return The contract.
+        */
+       @Override
+       public SolutionSetPlaceHolder<?> getOperator() {
+               return (SolutionSetPlaceHolder<?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Solution Set";
+       }
+       
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               DagConnection solutionSetInput = 
this.iterationNode.getFirstIncomingConnection();
+               OptimizerNode solutionSetSource = solutionSetInput.getSource();
+               
+               addClosedBranches(solutionSetSource.closedBranchingNodes);
+               List<UnclosedBranchDescriptor> fromInput = 
solutionSetSource.getBranchesForParent(solutionSetInput);
+               this.openBranches = (fromInput == null || fromInput.isEmpty()) 
? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
new file mode 100644
index 0000000..83bc39a
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer's internal representation of a <i>SortPartition</i> operator 
node.
+ */
+public class SortPartitionNode extends SingleInputNode {
+
+       private final List<OperatorDescriptorSingle> possibleProperties;
+
+       public SortPartitionNode(SortPartitionOperatorBase<?> operator) {
+               super(operator);
+               
+               OperatorDescriptorSingle descr = new 
SortPartitionDescriptor(operator.getPartitionOrdering());
+               this.possibleProperties = Collections.singletonList(descr);
+       }
+
+       @Override
+       public SortPartitionOperatorBase<?> getOperator() {
+               return (SortPartitionOperatorBase<?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Sort-Partition";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // sorting does not change the number of records
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               this.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+       }
+       
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static class SortPartitionDescriptor extends 
OperatorDescriptorSingle {
+
+               private Ordering partitionOrder;
+
+               public SortPartitionDescriptor(Ordering partitionOrder) {
+                       this.partitionOrder = partitionOrder;
+               }
+               
+               @Override
+               public DriverStrategy getStrategy() {
+                       return DriverStrategy.UNARY_NO_OP;
+               }
+
+               @Override
+               public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
+                       return new SingleInputPlanNode(node, "Sort-Partition", 
in, DriverStrategy.UNARY_NO_OP);
+               }
+
+               @Override
+               protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+                       // sort partition does not require any global property
+                       return Collections.singletonList(new 
RequestedGlobalProperties());
+               }
+
+               @Override
+               protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+                       // set partition order as required local property
+                       RequestedLocalProperties rlp = new 
RequestedLocalProperties();
+                       rlp.setOrdering(this.partitionOrder);
+
+                       return Collections.singletonList(rlp);
+               }
+               
+               @Override
+               public GlobalProperties 
computeGlobalProperties(GlobalProperties gProps) {
+                       // sort partition is a no-operation operation, such 
that all global properties are preserved.
+                       return gProps;
+               }
+               
+               @Override
+               public LocalProperties computeLocalProperties(LocalProperties 
lProps) {
+                       // sort partition is a no-operation operation, such 
that all global properties are preserved.
+                       return lProps;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
new file mode 100644
index 0000000..0d1dfc9
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TempMode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+/**
+ * Enumeration to indicate the mode of temporarily materializing the data that 
flows across a connection.
+ * Introducing such an artificial dam is sometimes necessary to avoid that a 
certain data flows deadlock
+ * themselves, or as a cache to replay an intermediate result.
+ */
+public enum TempMode {
+       
+       NONE(false, false),
+       PIPELINE_BREAKER(false, true),
+       CACHED(true, false),
+       CACHING_PIPELINE_BREAKER(true, true);
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private final boolean cached;
+       
+       private final boolean breaksPipeline;
+       
+       
+       private TempMode(boolean cached, boolean breaksPipeline) {
+               this.cached = cached;
+               this.breaksPipeline = breaksPipeline;
+       }
+
+       public boolean isCached() {
+               return cached;
+       }
+
+       public boolean breaksPipeline() {
+               return breaksPipeline;
+       }
+       
+       public TempMode makePipelineBreaker() {
+               if (this == NONE) {
+                       return PIPELINE_BREAKER;
+               } else if (this == CACHED) {
+                       return CACHING_PIPELINE_BREAKER;
+               } else {
+                       return this;
+               }
+       }
+       
+       public TempMode makeCached() {
+               if (this == NONE) {
+                       return CACHED;
+               } else if (this == PIPELINE_BREAKER) {
+                       return CACHING_PIPELINE_BREAKER;
+               } else {
+                       return this;
+               }
+       }
+       
+       
+       public TempMode makeNonCached() {
+               if (this == CACHED) {
+                       return NONE;
+               } else if (this == CACHING_PIPELINE_BREAKER) {
+                       return PIPELINE_BREAKER;
+               } else {
+                       return this;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
new file mode 100644
index 0000000..39da165
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import 
org.apache.flink.optimizer.operators.OperatorDescriptorDual.GlobalPropertiesPair;
+import 
org.apache.flink.optimizer.operators.OperatorDescriptorDual.LocalPropertiesPair;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.DamBehavior;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+import com.google.common.collect.Sets;
+
+/**
+ * A node in the optimizer plan that represents a PACT with a two different 
inputs, such as MATCH or CROSS.
+ * The two inputs are not substitutable in their sides.
+ */
+public abstract class TwoInputNode extends OptimizerNode {
+       
+       protected final FieldList keys1; // The set of key fields for the first 
input
+       
+       protected final FieldList keys2; // The set of key fields for the 
second input
+       
+       protected DagConnection input1; // The first input edge
+
+       protected DagConnection input2; // The second input edge
+       
+       private List<OperatorDescriptorDual> cachedDescriptors;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Creates a new node with a single input for the optimizer plan.
+        * 
+        * @param pactContract
+        *        The PACT that the node represents.
+        */
+       public TwoInputNode(DualInputOperator<?, ?, ?, ?> pactContract) {
+               super(pactContract);
+
+               int[] k1 = pactContract.getKeyColumns(0);
+               int[] k2 = pactContract.getKeyColumns(1);
+               
+               this.keys1 = k1 == null || k1.length == 0 ? null : new 
FieldList(k1);
+               this.keys2 = k2 == null || k2.length == 0 ? null : new 
FieldList(k2);
+               
+               if (this.keys1 != null) {
+                       if (this.keys2 != null) {
+                               if (this.keys1.size() != this.keys2.size()) {
+                                       throw new CompilerException("Unequal 
number of key fields on the two inputs.");
+                               }
+                       } else {
+                               throw new CompilerException("Keys are set on 
first input, but not on second.");
+                       }
+               } else if (this.keys2 != null) {
+                       throw new CompilerException("Keys are set on second 
input, but not on first.");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public DualInputOperator<?, ?, ?, ?> getOperator() {
+               return (DualInputOperator<?, ?, ?, ?>) super.getOperator();
+       }
+
+       /**
+        * Gets the <tt>PactConnection</tt> through which this node receives 
its <i>first</i> input.
+        * 
+        * @return The first input connection.
+        */
+       public DagConnection getFirstIncomingConnection() {
+               return this.input1;
+       }
+
+       /**
+        * Gets the <tt>PactConnection</tt> through which this node receives 
its <i>second</i> input.
+        * 
+        * @return The second input connection.
+        */
+       public DagConnection getSecondIncomingConnection() {
+               return this.input2;
+       }
+       
+       public OptimizerNode getFirstPredecessorNode() {
+               if(this.input1 != null) {
+                       return this.input1.getSource();
+               } else {
+                       return null;
+               }
+       }
+
+       public OptimizerNode getSecondPredecessorNode() {
+               if(this.input2 != null) {
+                       return this.input2.getSource();
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       public List<DagConnection> getIncomingConnections() {
+               ArrayList<DagConnection> inputs = new 
ArrayList<DagConnection>(2);
+               inputs.add(input1);
+               inputs.add(input2);
+               return inputs;
+       }
+
+
+       @Override
+       public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, 
ExecutionMode defaultExecutionMode) {
+               // see if there is a hint that dictates which shipping strategy 
to use for BOTH inputs
+               final Configuration conf = getOperator().getParameters();
+               ShipStrategyType preSet1 = null;
+               ShipStrategyType preSet2 = null;
+               
+               String shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
+               if (shipStrategy != null) {
+                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+                               preSet1 = preSet2 = ShipStrategyType.FORWARD;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+                               preSet1 = preSet2 = ShipStrategyType.BROADCAST;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+                               preSet1 = preSet2 = 
ShipStrategyType.PARTITION_HASH;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+                               preSet1 = preSet2 = 
ShipStrategyType.PARTITION_RANGE;
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
+                               preSet1 = preSet2 = 
ShipStrategyType.PARTITION_RANDOM;
+                       } else {
+                               throw new CompilerException("Unknown hint for 
shipping strategy: " + shipStrategy);
+                       }
+               }
+
+               // see if there is a hint that dictates which shipping strategy 
to use for the FIRST input
+               shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
+               if (shipStrategy != null) {
+                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+                               preSet1 = ShipStrategyType.FORWARD;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+                               preSet1 = ShipStrategyType.BROADCAST;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+                               preSet1 = ShipStrategyType.PARTITION_HASH;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+                               preSet1 = ShipStrategyType.PARTITION_RANGE;
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
+                               preSet1 = ShipStrategyType.PARTITION_RANDOM;
+                       } else {
+                               throw new CompilerException("Unknown hint for 
shipping strategy of input one: " + shipStrategy);
+                       }
+               }
+
+               // see if there is a hint that dictates which shipping strategy 
to use for the SECOND input
+               shipStrategy = 
conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
+               if (shipStrategy != null) {
+                       if 
(Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+                               preSet2 = ShipStrategyType.FORWARD;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+                               preSet2 = ShipStrategyType.BROADCAST;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+                               preSet2 = ShipStrategyType.PARTITION_HASH;
+                       } else if 
(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+                               preSet2 = ShipStrategyType.PARTITION_RANGE;
+                       } else if 
(shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
+                               preSet2 = ShipStrategyType.PARTITION_RANDOM;
+                       } else {
+                               throw new CompilerException("Unknown hint for 
shipping strategy of input two: " + shipStrategy);
+                       }
+               }
+               
+               // get the predecessors
+               DualInputOperator<?, ?, ?, ?> contr = getOperator();
+               
+               Operator<?> leftPred = contr.getFirstInput();
+               Operator<?> rightPred = contr.getSecondInput();
+               
+               OptimizerNode pred1;
+               DagConnection conn1;
+               if (leftPred == null) {
+                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input set for first input.");
+               } else {
+                       pred1 = contractToNode.get(leftPred);
+                       conn1 = new DagConnection(pred1, this, 
defaultExecutionMode);
+                       if (preSet1 != null) {
+                               conn1.setShipStrategy(preSet1);
+                       }
+               } 
+               
+               // create the connection and add it
+               this.input1 = conn1;
+               pred1.addOutgoingConnection(conn1);
+               
+               OptimizerNode pred2;
+               DagConnection conn2;
+               if (rightPred == null) {
+                       throw new CompilerException("Error: Node for '" + 
getOperator().getName() + "' has no input set for second input.");
+               } else {
+                       pred2 = contractToNode.get(rightPred);
+                       conn2 = new DagConnection(pred2, this, 
defaultExecutionMode);
+                       if (preSet2 != null) {
+                               conn2.setShipStrategy(preSet2);
+                       }
+               }
+               
+               // create the connection and add it
+               this.input2 = conn2;
+               pred2.addOutgoingConnection(conn2);
+       }
+       
+       protected abstract List<OperatorDescriptorDual> getPossibleProperties();
+
+       private List<OperatorDescriptorDual> getProperties() {
+               if (this.cachedDescriptors == null) {
+                       this.cachedDescriptors = getPossibleProperties();
+               }
+               return this.cachedDescriptors;
+       }
+       
+       @Override
+       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
+               // get what we inherit and what is preserved by our user code 
+               final InterestingProperties props1 = 
getInterestingProperties().filterByCodeAnnotations(this, 0);
+               final InterestingProperties props2 = 
getInterestingProperties().filterByCodeAnnotations(this, 1);
+               
+               // add all properties relevant to this node
+               for (OperatorDescriptorDual dpd : getProperties()) {
+                       for (GlobalPropertiesPair gp : 
dpd.getPossibleGlobalProperties()) {
+                               // input 1
+                               props1.addGlobalProperties(gp.getProperties1());
+                               
+                               // input 2
+                               props2.addGlobalProperties(gp.getProperties2());
+                       }
+                       for (LocalPropertiesPair lp : 
dpd.getPossibleLocalProperties()) {
+                               // input 1
+                               props1.addLocalProperties(lp.getProperties1());
+                               
+                               // input 2
+                               props2.addLocalProperties(lp.getProperties2());
+                       }
+               }
+               this.input1.setInterestingProperties(props1);
+               this.input2.setInterestingProperties(props2);
+               
+               for (DagConnection conn : getBroadcastConnections()) {
+                       conn.setInterestingProperties(new 
InterestingProperties());
+               }
+       }
+
+       @Override
+       public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+               // check if we have a cached version
+               if (this.cachedPlans != null) {
+                       return this.cachedPlans;
+               }
+
+               boolean childrenSkippedDueToReplicatedInput = false;
+
+               // step down to all producer nodes and calculate alternative 
plans
+               final List<? extends PlanNode> subPlans1 = 
getFirstPredecessorNode().getAlternativePlans(estimator);
+               final List<? extends PlanNode> subPlans2 = 
getSecondPredecessorNode().getAlternativePlans(estimator);
+
+               // calculate alternative sub-plans for predecessor
+               final Set<RequestedGlobalProperties> intGlobal1 = 
this.input1.getInterestingProperties().getGlobalProperties();
+               final Set<RequestedGlobalProperties> intGlobal2 = 
this.input2.getInterestingProperties().getGlobalProperties();
+               
+               // calculate alternative sub-plans for broadcast inputs
+               final List<Set<? extends NamedChannel>> broadcastPlanChannels = 
new ArrayList<Set<? extends NamedChannel>>();
+               List<DagConnection> broadcastConnections = 
getBroadcastConnections();
+               List<String> broadcastConnectionNames = 
getBroadcastConnectionNames();
+
+               for (int i = 0; i < broadcastConnections.size(); i++ ) {
+                       DagConnection broadcastConnection = 
broadcastConnections.get(i);
+                       String broadcastConnectionName = 
broadcastConnectionNames.get(i);
+                       List<PlanNode> broadcastPlanCandidates = 
broadcastConnection.getSource().getAlternativePlans(estimator);
+
+                       // wrap the plan candidates in named channels
+                       HashSet<NamedChannel> broadcastChannels = new 
HashSet<NamedChannel>(broadcastPlanCandidates.size());
+                       for (PlanNode plan: broadcastPlanCandidates) {
+                               final NamedChannel c = new 
NamedChannel(broadcastConnectionName, plan);
+                               DataExchangeMode exMode = 
DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
+                                                                               
        ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
+                               c.setShipStrategy(ShipStrategyType.BROADCAST, 
exMode);
+                               broadcastChannels.add(c);
+                       }
+                       broadcastPlanChannels.add(broadcastChannels);
+               }
+               
+               final GlobalPropertiesPair[] allGlobalPairs;
+               final LocalPropertiesPair[] allLocalPairs;
+               {
+                       Set<GlobalPropertiesPair> pairsGlob = new 
HashSet<GlobalPropertiesPair>();
+                       Set<LocalPropertiesPair> pairsLoc = new 
HashSet<LocalPropertiesPair>();
+                       for (OperatorDescriptorDual ods : getProperties()) {
+                               
pairsGlob.addAll(ods.getPossibleGlobalProperties());
+                               
pairsLoc.addAll(ods.getPossibleLocalProperties());
+                       }
+                       allGlobalPairs = pairsGlob.toArray(new 
GlobalPropertiesPair[pairsGlob.size()]);
+                       allLocalPairs = pairsLoc.toArray(new 
LocalPropertiesPair[pairsLoc.size()]);
+               }
+               
+               final ArrayList<PlanNode> outputPlans = new 
ArrayList<PlanNode>();
+
+               final ExecutionMode input1Mode = 
this.input1.getDataExchangeMode();
+               final ExecutionMode input2Mode = 
this.input2.getDataExchangeMode();
+
+               final int dop = getParallelism();
+               final int inDop1 = getFirstPredecessorNode().getParallelism();
+               final int inDop2 = getSecondPredecessorNode().getParallelism();
+
+               final boolean dopChange1 = dop != inDop1;
+               final boolean dopChange2 = dop != inDop2;
+
+               final boolean input1breaksPipeline = 
this.input1.isBreakingPipeline();
+               final boolean input2breaksPipeline = 
this.input2.isBreakingPipeline();
+
+               // enumerate all pairwise combination of the children's plans 
together with
+               // all possible operator strategy combination
+               
+               // create all candidates
+               for (PlanNode child1 : subPlans1) {
+
+                       if (child1.getGlobalProperties().isFullyReplicated()) {
+                               // fully replicated input is always locally 
forwarded if DOP is not changed
+                               if (dopChange1) {
+                                       // can not continue with this child
+                                       childrenSkippedDueToReplicatedInput = 
true;
+                                       continue;
+                               } else {
+                                       
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+                               }
+                       }
+
+                       for (PlanNode child2 : subPlans2) {
+
+                               if 
(child2.getGlobalProperties().isFullyReplicated()) {
+                                       // fully replicated input is always 
locally forwarded if DOP is not changed
+                                       if (dopChange2) {
+                                               // can not continue with this 
child
+                                               
childrenSkippedDueToReplicatedInput = true;
+                                               continue;
+                                       } else {
+                                               
this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+                                       }
+                               }
+                               
+                               // check that the children go together. that is 
the case if they build upon the same
+                               // candidate at the joined branch plan. 
+                               if (!areBranchCompatible(child1, child2)) {
+                                       continue;
+                               }
+                               
+                               for (RequestedGlobalProperties igps1: 
intGlobal1) {
+                                       // create a candidate channel for the 
first input. mark it cached, if the connection says so
+                                       final Channel c1 = new Channel(child1, 
this.input1.getMaterializationMode());
+                                       if (this.input1.getShipStrategy() == 
null) {
+                                               // free to choose the ship 
strategy
+                                               igps1.parameterizeChannel(c1, 
dopChange1, input1Mode, input1breaksPipeline);
+                                               
+                                               // if the DOP changed, make 
sure that we cancel out properties, unless the
+                                               // ship strategy 
preserves/establishes them even under changing DOPs
+                                               if (dopChange1 && 
!c1.getShipStrategy().isNetworkStrategy()) {
+                                                       
c1.getGlobalProperties().reset();
+                                               }
+                                       }
+                                       else {
+                                               // ship strategy fixed by 
compiler hint
+                                               ShipStrategyType shipType = 
this.input1.getShipStrategy();
+                                               DataExchangeMode exMode = 
DataExchangeMode.select(input1Mode, shipType, input1breaksPipeline);
+                                               if (this.keys1 != null) {
+                                                       
c1.setShipStrategy(shipType, this.keys1.toFieldList(), exMode);
+                                               }
+                                               else {
+                                                       
c1.setShipStrategy(shipType, exMode);
+                                               }
+                                               
+                                               if (dopChange1) {
+                                                       
c1.adjustGlobalPropertiesForFullParallelismChange();
+                                               }
+                                       }
+                                       
+                                       for (RequestedGlobalProperties igps2: 
intGlobal2) {
+                                               // create a candidate channel 
for the first input. mark it cached, if the connection says so
+                                               final Channel c2 = new 
Channel(child2, this.input2.getMaterializationMode());
+                                               if 
(this.input2.getShipStrategy() == null) {
+                                                       // free to choose the 
ship strategy
+                                                       
igps2.parameterizeChannel(c2, dopChange2, input2Mode, input2breaksPipeline);
+                                                       
+                                                       // if the DOP changed, 
make sure that we cancel out properties, unless the
+                                                       // ship strategy 
preserves/establishes them even under changing DOPs
+                                                       if (dopChange2 && 
!c2.getShipStrategy().isNetworkStrategy()) {
+                                                               
c2.getGlobalProperties().reset();
+                                                       }
+                                               } else {
+                                                       // ship strategy fixed 
by compiler hint
+                                                       ShipStrategyType 
shipType = this.input2.getShipStrategy();
+                                                       DataExchangeMode exMode 
= DataExchangeMode.select(input2Mode, shipType, input2breaksPipeline);
+                                                       if (this.keys2 != null) 
{
+                                                               
c2.setShipStrategy(shipType, this.keys2.toFieldList(), exMode);
+                                                       } else {
+                                                               
c2.setShipStrategy(shipType, exMode);
+                                                       }
+                                                       
+                                                       if (dopChange2) {
+                                                               
c2.adjustGlobalPropertiesForFullParallelismChange();
+                                                       }
+                                               }
+                                               
+                                               /* 
********************************************************************
+                                                * NOTE: Depending on how we 
proceed with different partitioning,
+                                                *       we might at some point 
need a compatibility check between
+                                                *       the pairs of global 
properties.
+                                                * 
*******************************************************************/
+                                               
+                                               outer:
+                                               for (GlobalPropertiesPair gpp : 
allGlobalPairs) {
+                                                       if 
(gpp.getProperties1().isMetBy(c1.getGlobalProperties()) && 
+                                                               
gpp.getProperties2().isMetBy(c2.getGlobalProperties()) )
+                                                       {
+                                                               for 
(OperatorDescriptorDual desc : getProperties()) {
+                                                                       if 
(desc.areCompatible(gpp.getProperties1(), gpp.getProperties2(), 
+                                                                               
        c1.getGlobalProperties(), c2.getGlobalProperties()))
+                                                                       {
+                                                                               
Channel c1Clone = c1.clone();
+                                                                               
c1Clone.setRequiredGlobalProps(gpp.getProperties1());
+                                                                               
c2.setRequiredGlobalProps(gpp.getProperties2());
+                                                                               
+                                                                               
// we form a valid combination, so create the local candidates
+                                                                               
// for this
+                                                                               
addLocalCandidates(c1Clone, c2, broadcastPlanChannels, igps1, igps2,
+                                                                               
                                                                        
outputPlans, allLocalPairs, estimator);
+                                                                               
break outer;
+                                                                       }
+                                                               }
+                                                       }
+                                               }
+                                               
+                                               // break the loop over input2's 
possible global properties, if the property
+                                               // is fixed via a hint. All the 
properties are overridden by the hint anyways,
+                                               // so we can stop after the 
first
+                                               if 
(this.input2.getShipStrategy() != null) {
+                                                       break;
+                                               }
+                                       }
+                                       
+                                       // break the loop over input1's 
possible global properties, if the property
+                                       // is fixed via a hint. All the 
properties are overridden by the hint anyways,
+                                       // so we can stop after the first
+                                       if (this.input1.getShipStrategy() != 
null) {
+                                               break;
+                                       }
+                               }
+                       }
+               }
+
+               if(outputPlans.isEmpty()) {
+                       if(childrenSkippedDueToReplicatedInput) {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+                                                                               
        + ". Most likely reason: Invalid use of replicated input.");
+                       } else {
+                               throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+                                                                               
        + ". Most likely reason: Too restrictive plan hints.");
+                       }
+               }
+
+               // cost and prune the plans
+               for (PlanNode node : outputPlans) {
+                       estimator.costOperator(node);
+               }
+               prunePlanAlternatives(outputPlans);
+               outputPlans.trimToSize();
+
+               this.cachedPlans = outputPlans;
+               return outputPlans;
+       }
+       
+       protected void addLocalCandidates(Channel template1, Channel template2, 
List<Set<? extends NamedChannel>> broadcastPlanChannels, 
+                       RequestedGlobalProperties rgps1, 
RequestedGlobalProperties rgps2,
+                       List<PlanNode> target, LocalPropertiesPair[] 
validLocalCombinations, CostEstimator estimator)
+       {
+               for (RequestedLocalProperties ilp1 : 
this.input1.getInterestingProperties().getLocalProperties()) {
+                       final Channel in1 = template1.clone();
+                       ilp1.parameterizeChannel(in1);
+                       
+                       for (RequestedLocalProperties ilp2 : 
this.input2.getInterestingProperties().getLocalProperties()) {
+                               final Channel in2 = template2.clone();
+                               ilp2.parameterizeChannel(in2);
+                               
+                               for (OperatorDescriptorDual dps: 
getProperties()) {
+                                       for (LocalPropertiesPair lpp : 
dps.getPossibleLocalProperties()) {
+                                               if 
(lpp.getProperties1().isMetBy(in1.getLocalProperties()) &&
+                                                       
lpp.getProperties2().isMetBy(in2.getLocalProperties()) )
+                                               {
+                                                       // valid combination
+                                                       // for non trivial 
local properties, we need to check that they are co compatible
+                                                       // (such as when some 
sort order is requested, that both are the same sort order
+                                                       if 
(dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
+                                                               
in1.getLocalProperties(), in2.getLocalProperties()))
+                                                       {
+                                                               // copy, 
because setting required properties and instantiation may
+                                                               // change the 
channels and should not affect prior candidates
+                                                               Channel in1Copy 
= in1.clone();
+                                                               
in1Copy.setRequiredLocalProps(lpp.getProperties1());
+                                                               
+                                                               Channel in2Copy 
= in2.clone();
+                                                               
in2Copy.setRequiredLocalProps(lpp.getProperties2());
+                                                               
+                                                               // all right, 
co compatible
+                                                               
instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, 
rgps1, rgps2, ilp1, ilp2);
+                                                               break;
+                                                       }
+                                                       // else cannot use this 
pair, fall through the loop and try the next one
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       protected void instantiate(OperatorDescriptorDual operator, Channel 
in1, Channel in2,
+                       List<Set<? extends NamedChannel>> 
broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
+                       RequestedGlobalProperties globPropsReq1, 
RequestedGlobalProperties globPropsReq2,
+                       RequestedLocalProperties locPropsReq1, 
RequestedLocalProperties locPropsReq2)
+       {
+               final PlanNode inputSource1 = in1.getSource();
+               final PlanNode inputSource2 = in2.getSource();
+               
+               for (List<NamedChannel> broadcastChannelsCombination: 
Sets.cartesianProduct(broadcastPlanChannels)) {
+                       
+                       boolean validCombination = true;
+                       
+                       // check whether the broadcast inputs use the same plan 
candidate at the branching point
+                       for (int i = 0; i < 
broadcastChannelsCombination.size(); i++) {
+                               NamedChannel nc = 
broadcastChannelsCombination.get(i);
+                               PlanNode bcSource = nc.getSource();
+                               
+                               if (!(areBranchCompatible(bcSource, 
inputSource1) || areBranchCompatible(bcSource, inputSource2))) {
+                                       validCombination = false;
+                                       break;
+                               }
+                               
+                               // check branch compatibility against all other 
broadcast variables
+                               for (int k = 0; k < i; k++) {
+                                       PlanNode otherBcSource = 
broadcastChannelsCombination.get(k).getSource();
+                                       
+                                       if (!areBranchCompatible(bcSource, 
otherBcSource)) {
+                                               validCombination = false;
+                                               break;
+                                       }
+                               }
+                       }
+                       
+                       if (!validCombination) {
+                               continue;
+                       }
+                       
+                       
placePipelineBreakersIfNecessary(operator.getStrategy(), in1, in2);
+                       
+                       DualInputPlanNode node = operator.instantiate(in1, in2, 
this);
+                       node.setBroadcastInputs(broadcastChannelsCombination);
+
+                       SemanticProperties props = this.getSemanticProperties();
+                       GlobalProperties gp1 = 
in1.getGlobalProperties().clone().filterBySemanticProperties(props, 0);
+                       GlobalProperties gp2 = 
in2.getGlobalProperties().clone().filterBySemanticProperties(props, 1);
+                       GlobalProperties combined = 
operator.computeGlobalProperties(gp1, gp2);
+
+                       LocalProperties lp1 = 
in1.getLocalProperties().clone().filterBySemanticProperties(props, 0);
+                       LocalProperties lp2 = 
in2.getLocalProperties().clone().filterBySemanticProperties(props, 1);
+                       LocalProperties locals = 
operator.computeLocalProperties(lp1, lp2);
+                       
+                       node.initProperties(combined, locals);
+                       node.updatePropertiesWithUniqueSets(getUniqueFields());
+                       target.add(node);
+               }
+       }
+       
+       protected void placePipelineBreakersIfNecessary(DriverStrategy 
strategy, Channel in1, Channel in2) {
+               // before we instantiate, check for deadlocks by tracing back 
to the open branches and checking
+               // whether either no input, or all of them have a dam
+               if (this.hereJoinedBranches != null && 
this.hereJoinedBranches.size() > 0) {
+                       boolean someDamOnLeftPaths = false;
+                       boolean damOnAllLeftPaths = true;
+                       boolean someDamOnRightPaths = false;
+                       boolean damOnAllRightPaths = true;
+                       
+                       if (strategy.firstDam() == DamBehavior.FULL_DAM || 
in1.getLocalStrategy().dams() || in1.getTempMode().breaksPipeline()) {
+                               someDamOnLeftPaths = true;
+                       } else {
+                               for (OptimizerNode brancher : 
this.hereJoinedBranches) {
+                                       PlanNode candAtBrancher = 
in1.getSource().getCandidateAtBranchPoint(brancher);
+                                       
+                                       // not all candidates are found, 
because this list includes joined branched from both regular inputs and 
broadcast vars
+                                       if (candAtBrancher == null) {
+                                               continue;
+                                       }
+                                       
+                                       SourceAndDamReport res = 
in1.getSource().hasDamOnPathDownTo(candAtBrancher);
+                                       if (res == NOT_FOUND) {
+                                               throw new 
CompilerException("Bug: Tracing dams for deadlock detection is broken.");
+                                       } else if (res == FOUND_SOURCE) {
+                                               damOnAllLeftPaths = false;
+                                       } else if (res == FOUND_SOURCE_AND_DAM) 
{
+                                               someDamOnLeftPaths = true;
+                                       } else {
+                                               throw new CompilerException();
+                                       }
+                               }
+                       }
+                       
+                       if (strategy.secondDam() == DamBehavior.FULL_DAM || 
in2.getLocalStrategy().dams() || in2.getTempMode().breaksPipeline()) {
+                               someDamOnRightPaths = true;
+                       } else {
+                               for (OptimizerNode brancher : 
this.hereJoinedBranches) {
+                                       PlanNode candAtBrancher = 
in2.getSource().getCandidateAtBranchPoint(brancher);
+                                       
+                                       // not all candidates are found, 
because this list includes joined branched from both regular inputs and 
broadcast vars
+                                       if (candAtBrancher == null) {
+                                               continue;
+                                       }
+                                       
+                                       SourceAndDamReport res = 
in2.getSource().hasDamOnPathDownTo(candAtBrancher);
+                                       if (res == NOT_FOUND) {
+                                               throw new 
CompilerException("Bug: Tracing dams for deadlock detection is broken.");
+                                       } else if (res == FOUND_SOURCE) {
+                                               damOnAllRightPaths = false;
+                                       } else if (res == FOUND_SOURCE_AND_DAM) 
{
+                                               someDamOnRightPaths = true;
+                                       } else {
+                                               throw new CompilerException();
+                                       }
+                               }
+                       }
+                       
+                       // okay combinations are both all dam or both no dam
+                       if ( (damOnAllLeftPaths & damOnAllRightPaths) | 
(!someDamOnLeftPaths & !someDamOnRightPaths) ) {
+                               // good, either both materialize already on the 
way, or both fully pipeline
+                       } else {
+                               if (someDamOnLeftPaths & !damOnAllRightPaths) {
+                                       // right needs a pipeline breaker
+                                       
in2.setTempMode(in2.getTempMode().makePipelineBreaker());
+                               }
+                               
+                               if (someDamOnRightPaths & !damOnAllLeftPaths) {
+                                       // right needs a pipeline breaker
+                                       
in1.setTempMode(in1.getTempMode().makePipelineBreaker());
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               // handle the data flow branching for the regular inputs
+               
addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
+               
addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
+               
+               List<UnclosedBranchDescriptor> result1 = 
getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
+               List<UnclosedBranchDescriptor> result2 = 
getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
+
+               ArrayList<UnclosedBranchDescriptor> inputsMerged = new 
ArrayList<UnclosedBranchDescriptor>();
+               mergeLists(result1, result2, inputsMerged, true);
+               
+               // handle the data flow branching for the broadcast inputs
+               List<UnclosedBranchDescriptor> result = 
computeUnclosedBranchStackForBroadcastInputs(inputsMerged);
+               
+               this.openBranches = (result == null || result.isEmpty()) ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
+       }
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return getOperator().getSemanticProperties();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                     Miscellaneous
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void accept(Visitor<OptimizerNode> visitor) {
+               if (visitor.preVisit(this)) {
+                       if (this.input1 == null || this.input2 == null) {
+                               throw new CompilerException();
+                       }
+                       
+                       getFirstPredecessorNode().accept(visitor);
+                       getSecondPredecessorNode().accept(visitor);
+                       
+                       for (DagConnection connection : 
getBroadcastConnections()) {
+                               connection.getSource().accept(visitor);
+                       }
+                       
+                       visitor.postVisit(this);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
new file mode 100644
index 0000000..45ecdac
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+
+public class UnaryOperatorNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> operator;
+       
+       private final String name;
+
+
+       
+       public UnaryOperatorNode(String name, FieldSet keys, 
OperatorDescriptorSingle ... operators) {
+               this(name, keys, Arrays.asList(operators));
+       }
+       
+       public UnaryOperatorNode(String name, FieldSet keys, 
List<OperatorDescriptorSingle> operators) {
+               super(keys);
+               
+               this.operator = operators;
+               this.name = name;
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.operator;
+       }
+
+       @Override
+       public String getName() {
+               return this.name;
+       }
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // we have no estimates by default
+       }
+}

Reply via email to