http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
new file mode 100644
index 0000000..e85f289
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.operators.SolutionSetDeltaOperator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import 
org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.Nothing;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A node in the optimizer's program representation for a workset iteration.
+ */
+public class WorksetIterationNode extends TwoInputNode implements 
IterationNode {
+       
+       private static final int DEFAULT_COST_WEIGHT = 20;
+       
+       
+       private final FieldList solutionSetKeyFields;
+       
+       private final GlobalProperties partitionedProperties;
+       
+       private final List<OperatorDescriptorDual> dataProperties;
+       
+       private SolutionSetNode solutionSetNode;
+       
+       private WorksetNode worksetNode;
+       
+       private OptimizerNode solutionSetDelta;
+       
+       private OptimizerNode nextWorkset;
+       
+       private DagConnection solutionSetDeltaRootConnection;
+       
+       private DagConnection nextWorksetRootConnection;
+       
+       private SingleRootJoiner singleRoot;
+       
+       private boolean solutionDeltaImmediatelyAfterSolutionJoin;
+       
+       private final int costWeight;
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Creates a new node with a single input for the optimizer plan.
+        * 
+        * @param iteration The iteration operator that the node represents.
+        */
+       public WorksetIterationNode(DeltaIterationBase<?, ?> iteration) {
+               super(iteration);
+               
+               final int[] ssKeys = iteration.getSolutionSetKeyFields();
+               if (ssKeys == null || ssKeys.length == 0) {
+                       throw new CompilerException("Invalid WorksetIteration: 
No key fields defined for the solution set.");
+               }
+               this.solutionSetKeyFields = new FieldList(ssKeys);
+               this.partitionedProperties = new GlobalProperties();
+               
this.partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
+               
+               int weight = iteration.getMaximumNumberOfIterations() > 0 ? 
+                       iteration.getMaximumNumberOfIterations() : 
DEFAULT_COST_WEIGHT;
+                       
+               if (weight > OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) {
+                       weight = OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT;
+               }
+               this.costWeight = weight; 
+               
+               this.dataProperties = 
Collections.<OperatorDescriptorDual>singletonList(new 
WorksetOpDescriptor(this.solutionSetKeyFields));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public DeltaIterationBase<?, ?> getIterationContract() {
+               return (DeltaIterationBase<?, ?>) getOperator();
+       }
+       
+       public SolutionSetNode getSolutionSetNode() {
+               return this.solutionSetNode;
+       }
+       
+       public WorksetNode getWorksetNode() {
+               return this.worksetNode;
+       }
+       
+       public OptimizerNode getNextWorkset() {
+               return this.nextWorkset;
+       }
+       
+       public OptimizerNode getSolutionSetDelta() {
+               return this.solutionSetDelta;
+       }
+
+       public void setPartialSolution(SolutionSetNode solutionSetNode, 
WorksetNode worksetNode) {
+               if (this.solutionSetNode != null || this.worksetNode != null) {
+                       throw new IllegalStateException("Error: Initializing 
WorksetIterationNode multiple times.");
+               }
+               this.solutionSetNode = solutionSetNode;
+               this.worksetNode = worksetNode;
+       }
+       
+       public void setNextPartialSolution(OptimizerNode solutionSetDelta, 
OptimizerNode nextWorkset,
+                                                                               
ExecutionMode executionMode) {
+
+               // check whether the next partial solution is itself the join 
with
+               // the partial solution (so we can potentially do direct 
updates)
+               if (solutionSetDelta instanceof TwoInputNode) {
+                       TwoInputNode solutionDeltaTwoInput = (TwoInputNode) 
solutionSetDelta;
+                       if (solutionDeltaTwoInput.getFirstPredecessorNode() == 
this.solutionSetNode ||
+                               
solutionDeltaTwoInput.getSecondPredecessorNode() == this.solutionSetNode)
+                       {
+                               this.solutionDeltaImmediatelyAfterSolutionJoin 
= true;
+                       }
+               }
+               
+               // there needs to be at least one node in the workset path, so
+               // if the next workset is equal to the workset, we need to 
inject a no-op node
+               if (nextWorkset == worksetNode || nextWorkset instanceof 
BinaryUnionNode) {
+                       NoOpNode noop = new NoOpNode();
+                       noop.setDegreeOfParallelism(getParallelism());
+
+                       DagConnection noOpConn = new DagConnection(nextWorkset, 
noop, executionMode);
+                       noop.setIncomingConnection(noOpConn);
+                       nextWorkset.addOutgoingConnection(noOpConn);
+                       
+                       nextWorkset = noop;
+               }
+               
+               // attach an extra node to the solution set delta for the cases 
where we need to repartition
+               UnaryOperatorNode solutionSetDeltaUpdateAux = new 
UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
+                               new 
SolutionSetDeltaOperator(getSolutionSetKeyFields()));
+               
solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
+
+               DagConnection conn = new DagConnection(solutionSetDelta, 
solutionSetDeltaUpdateAux, executionMode);
+               solutionSetDeltaUpdateAux.setIncomingConnection(conn);
+               solutionSetDelta.addOutgoingConnection(conn);
+               
+               this.solutionSetDelta = solutionSetDeltaUpdateAux;
+               this.nextWorkset = nextWorkset;
+               
+               this.singleRoot = new SingleRootJoiner();
+               this.solutionSetDeltaRootConnection = new 
DagConnection(solutionSetDeltaUpdateAux,
+                                                                               
                        this.singleRoot, executionMode);
+
+               this.nextWorksetRootConnection = new DagConnection(nextWorkset, 
this.singleRoot, executionMode);
+               this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, 
this.nextWorksetRootConnection);
+               
+               
solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection);
+               
nextWorkset.addOutgoingConnection(this.nextWorksetRootConnection);
+       }
+       
+       public int getCostWeight() {
+               return this.costWeight;
+       }
+       
+       public TwoInputNode getSingleRootOfStepFunction() {
+               return this.singleRoot;
+       }
+       
+       public FieldList getSolutionSetKeyFields() {
+               return this.solutionSetKeyFields;
+       }
+       
+       public OptimizerNode getInitialSolutionSetPredecessorNode() {
+               return getFirstPredecessorNode();
+       }
+       
+       public OptimizerNode getInitialWorksetPredecessorNode() {
+               return getSecondPredecessorNode();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public String getName() {
+               return "Workset Iteration";
+       }
+
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new EmptySemanticProperties();
+       }
+
+       protected void readStubAnnotations() {}
+       
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedOutputSize = 
getFirstPredecessorNode().getEstimatedOutputSize();
+               this.estimatedNumRecords = 
getFirstPredecessorNode().getEstimatedNumRecords();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                             Properties and Optimization
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return this.dataProperties;
+       }
+       
+       @Override
+       public void computeInterestingPropertiesForInputs(CostEstimator 
estimator) {
+               // our own solution (the solution set) is always partitioned 
and this cannot be adjusted
+               // depending on what the successor to the workset iteration 
requests. for that reason,
+               // we ignore incoming interesting properties.
+               
+               // in addition, we need to make 2 interesting property passes, 
because the root of the step function 
+               // that computes the next workset needs the interesting 
properties as generated by the
+               // workset source of the step function. the second pass 
concerns only the workset path.
+               // as initial interesting properties, we have the trivial ones 
for the step function,
+               // and partitioned on the solution set key for the solution set 
delta 
+               
+               RequestedGlobalProperties partitionedProperties = new 
RequestedGlobalProperties();
+               
partitionedProperties.setHashPartitioned(this.solutionSetKeyFields);
+               InterestingProperties partitionedIP = new 
InterestingProperties();
+               partitionedIP.addGlobalProperties(partitionedProperties);
+               partitionedIP.addLocalProperties(new 
RequestedLocalProperties());
+               
+               this.nextWorksetRootConnection.setInterestingProperties(new 
InterestingProperties());
+               
this.solutionSetDeltaRootConnection.setInterestingProperties(partitionedIP.clone());
+               
+               InterestingPropertyVisitor ipv = new 
InterestingPropertyVisitor(estimator);
+               this.nextWorkset.accept(ipv);
+               this.solutionSetDelta.accept(ipv);
+               
+               // take the interesting properties of the partial solution and 
add them to the root interesting properties
+               InterestingProperties worksetIntProps = 
this.worksetNode.getInterestingProperties();
+               InterestingProperties intProps = new InterestingProperties();
+               
intProps.getGlobalProperties().addAll(worksetIntProps.getGlobalProperties());
+               
intProps.getLocalProperties().addAll(worksetIntProps.getLocalProperties());
+               
+               // clear all interesting properties to prepare the second 
traversal
+               this.nextWorksetRootConnection.clearInterestingProperties();
+               this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
+               
+               // 2nd pass
+               
this.nextWorksetRootConnection.setInterestingProperties(intProps);
+               this.nextWorkset.accept(ipv);
+               
+               // now add the interesting properties of the workset to the 
workset input
+               final InterestingProperties inProps = 
this.worksetNode.getInterestingProperties().clone();
+               inProps.addGlobalProperties(new RequestedGlobalProperties());
+               inProps.addLocalProperties(new RequestedLocalProperties());
+               this.input2.setInterestingProperties(inProps);
+               
+               // the partial solution must be hash partitioned, so it has 
only that as interesting properties
+               this.input1.setInterestingProperties(partitionedIP);
+       }
+       
+       @Override
+       public void clearInterestingProperties() {
+               super.clearInterestingProperties();
+               
+               this.nextWorksetRootConnection.clearInterestingProperties();
+               
this.solutionSetDeltaRootConnection.clearInterestingProperties();
+               
+               this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE);
+               
this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE);
+       }
+       
+       @Override
+       protected void instantiate(OperatorDescriptorDual operator, Channel 
solutionSetIn, Channel worksetIn,
+                       List<Set<? extends NamedChannel>> 
broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator,
+                       RequestedGlobalProperties globPropsReqSolutionSet, 
RequestedGlobalProperties globPropsReqWorkset,
+                       RequestedLocalProperties locPropsReqSolutionSet, 
RequestedLocalProperties locPropsReqWorkset)
+       {
+               // check for pipeline breaking using hash join with build on 
the solution set side
+               
placePipelineBreakersIfNecessary(DriverStrategy.HYBRIDHASH_BUILD_FIRST, 
solutionSetIn, worksetIn);
+               
+               // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
+               // Whenever we instantiate the iteration, we enumerate new 
candidates for the step function.
+               // That way, we make sure we have an appropriate plan for each 
candidate for the initial partial solution,
+               // we have a fitting candidate for the step function (often, 
work is pushed out of the step function).
+               // Among the candidates of the step function, we keep only 
those that meet the requested properties of the
+               // current candidate initial partial solution. That makes sure 
these properties exist at the beginning of
+               // every iteration.
+               
+               // 1) Because we enumerate multiple times, we may need to clean 
the cached plans
+               //    before starting another enumeration
+               this.nextWorkset.accept(PlanCacheCleaner.INSTANCE);
+               this.solutionSetDelta.accept(PlanCacheCleaner.INSTANCE);
+               
+               // 2) Give the partial solution the properties of the current 
candidate for the initial partial solution
+               //    This concerns currently only the workset.
+               
this.worksetNode.setCandidateProperties(worksetIn.getGlobalProperties(), 
worksetIn.getLocalProperties(), worksetIn);
+               
this.solutionSetNode.setCandidateProperties(this.partitionedProperties, new 
LocalProperties(), solutionSetIn);
+               
+               final SolutionSetPlanNode sspn = 
this.solutionSetNode.getCurrentSolutionSetPlanNode();
+               final WorksetPlanNode wspn = 
this.worksetNode.getCurrentWorksetPlanNode();
+               
+               // 3) Get the alternative plans
+               List<PlanNode> solutionSetDeltaCandidates = 
this.solutionSetDelta.getAlternativePlans(estimator);
+               List<PlanNode> worksetCandidates = 
this.nextWorkset.getAlternativePlans(estimator);
+               
+               // 4) Throw away all that are not compatible with the 
properties currently requested to the
+               //    initial partial solution
+               
+               // Make sure that the workset candidates fulfill the input 
requirements
+               {
+                       List<PlanNode> newCandidates = new 
ArrayList<PlanNode>();
+                       
+                       for (Iterator<PlanNode> planDeleter = 
worksetCandidates.iterator(); planDeleter.hasNext(); ) {
+                               PlanNode candidate = planDeleter.next();
+                               
+                               GlobalProperties atEndGlobal = 
candidate.getGlobalProperties();
+                               LocalProperties atEndLocal = 
candidate.getLocalProperties();
+                               
+                               FeedbackPropertiesMeetRequirementsReport report 
= candidate.checkPartialSolutionPropertiesMet(wspn,
+                                                                               
                                                                                
                        atEndGlobal, atEndLocal);
+
+                               if (report == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+                                       ; // depends only through broadcast 
variable on the workset solution
+                               }
+                               else if (report == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+                                       // attach a no-op node through which we 
create the properties of the original input
+                                       Channel toNoOp = new Channel(candidate);
+                                       
globPropsReqWorkset.parameterizeChannel(toNoOp, false,
+                                                                               
                                        
nextWorksetRootConnection.getDataExchangeMode(), false);
+                                       
locPropsReqWorkset.parameterizeChannel(toNoOp);
+                                       
+                                       UnaryOperatorNode 
rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset 
Properties",
+                                                                               
                                                                                
                        FieldList.EMPTY_LIST);
+                                       
+                                       
rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+                                       
+                                       SingleInputPlanNode 
rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
+                                                                               
                rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
+                                                                               
                toNoOp, DriverStrategy.UNARY_NO_OP);
+                                       
rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(),
+                                                                               
                                                        
toNoOp.getLocalProperties());
+                                       
estimator.costOperator(rebuildWorksetPropertiesPlanNode);
+                                               
+                                       GlobalProperties atEndGlobalModified = 
rebuildWorksetPropertiesPlanNode.getGlobalProperties();
+                                       LocalProperties atEndLocalModified = 
rebuildWorksetPropertiesPlanNode.getLocalProperties();
+                                               
+                                       if 
(!(atEndGlobalModified.equals(atEndGlobal) && 
atEndLocalModified.equals(atEndLocal))) {
+                                               
FeedbackPropertiesMeetRequirementsReport report2 = 
candidate.checkPartialSolutionPropertiesMet(
+                                                                               
                                                                wspn, 
atEndGlobalModified, atEndLocalModified);
+                                               if (report2 != 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+                                                       
newCandidates.add(rebuildWorksetPropertiesPlanNode);
+                                               }
+                                       }
+                                       
+                                       // remove the original operator and add 
the modified candidate
+                                       planDeleter.remove();
+                                       
+                               }
+                       }
+                       
+                       worksetCandidates.addAll(newCandidates);
+               }
+               
+               if (worksetCandidates.isEmpty()) {
+                       return;
+               }
+               
+               // sanity check the solution set delta
+               for (PlanNode solutionSetDeltaCandidate : 
solutionSetDeltaCandidates) {
+                       SingleInputPlanNode candidate = (SingleInputPlanNode) 
solutionSetDeltaCandidate;
+                       GlobalProperties gp = candidate.getGlobalProperties();
+
+                       if (gp.getPartitioning() != 
PartitioningProperty.HASH_PARTITIONED || gp.getPartitioningFields() == null ||
+                                       
!gp.getPartitioningFields().equals(this.solutionSetKeyFields)) {
+                               throw new CompilerException("Bug: The solution 
set delta is not partitioned.");
+                       }
+               }
+               
+               // 5) Create a candidate for the Iteration Node for every 
remaining plan of the step function.
+               
+               final GlobalProperties gp = new GlobalProperties();
+               gp.setHashPartitioned(this.solutionSetKeyFields);
+               gp.addUniqueFieldCombination(this.solutionSetKeyFields);
+               
+               LocalProperties lp = 
LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields);
+               
+               // take all combinations of solution set delta and workset plans
+               for (PlanNode solutionSetCandidate : 
solutionSetDeltaCandidates) {
+                       for (PlanNode worksetCandidate : worksetCandidates) {
+                               // check whether they have the same operator at 
their latest branching point
+                               if 
(this.singleRoot.areBranchCompatible(solutionSetCandidate, worksetCandidate)) {
+                                       
+                                       SingleInputPlanNode 
siSolutionDeltaCandidate = (SingleInputPlanNode) solutionSetCandidate;
+                                       boolean immediateDeltaUpdate;
+                                       
+                                       // check whether we need a dedicated 
solution set delta operator, or whether we can update on the fly
+                                       if 
(siSolutionDeltaCandidate.getInput().getShipStrategy() == 
ShipStrategyType.FORWARD &&
+                                                       
this.solutionDeltaImmediatelyAfterSolutionJoin)
+                                       {
+                                               // we do not need this extra 
node. we can make the predecessor the delta
+                                               // sanity check the node and 
connection
+                                               if 
(siSolutionDeltaCandidate.getDriverStrategy() != DriverStrategy.UNARY_NO_OP ||
+                                                               
siSolutionDeltaCandidate.getInput().getLocalStrategy() != LocalStrategy.NONE)
+                                               {
+                                                       throw new 
CompilerException("Invalid Solution set delta node.");
+                                               }
+                                               
+                                               solutionSetCandidate = 
siSolutionDeltaCandidate.getInput().getSource();
+                                               immediateDeltaUpdate = true;
+                                       } else {
+                                               // was not partitioned, we need 
to keep this node.
+                                               // mark that we materialize the 
input
+                                               
siSolutionDeltaCandidate.getInput().setTempMode(TempMode.PIPELINE_BREAKER);
+                                               immediateDeltaUpdate = false;
+                                       }
+                                       
+                                       WorksetIterationPlanNode wsNode = new 
WorksetIterationPlanNode(this,
+                                                       "WorksetIteration 
("+this.getOperator().getName()+")", solutionSetIn,
+                                                       worksetIn, sspn, wspn, 
worksetCandidate, solutionSetCandidate);
+                                       
wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
+                                       wsNode.initProperties(gp, lp);
+                                       target.add(wsNode);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+               
+               // IMPORTANT: First compute closed branches from the two inputs
+               // we need to do this because the runtime iteration head 
effectively joins
+               
addClosedBranches(getFirstPredecessorNode().closedBranchingNodes);
+               
addClosedBranches(getSecondPredecessorNode().closedBranchingNodes);
+
+               List<UnclosedBranchDescriptor> result1 = 
getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection());
+               List<UnclosedBranchDescriptor> result2 = 
getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection());
+
+               ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new 
ArrayList<UnclosedBranchDescriptor>();
+               mergeLists(result1, result2, inputsMerged1, true); // this 
method also sets which branches are joined here (in the head)
+               
+               
addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
+
+               ArrayList<UnclosedBranchDescriptor> inputsMerged2 = new 
ArrayList<UnclosedBranchDescriptor>();
+               List<UnclosedBranchDescriptor> result3 = 
getSingleRootOfStepFunction().openBranches;
+               mergeLists(inputsMerged1, result3, inputsMerged2, true);
+
+               // handle the data flow branching for the broadcast inputs
+               List<UnclosedBranchDescriptor> result = 
computeUnclosedBranchStackForBroadcastInputs(inputsMerged2);
+
+               this.openBranches = (result == null || result.isEmpty()) ? 
Collections.<UnclosedBranchDescriptor>emptyList() : result;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                      Iteration Specific Traversals
+       // 
--------------------------------------------------------------------------------------------
+
+       public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
+               this.singleRoot.accept(visitor);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                             Utility Classes
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static final class WorksetOpDescriptor extends 
OperatorDescriptorDual {
+               
+               private WorksetOpDescriptor(FieldList solutionSetKeys) {
+                       super(solutionSetKeys, null);
+               }
+
+               @Override
+               public DriverStrategy getStrategy() {
+                       return DriverStrategy.NONE;
+               }
+
+               @Override
+               protected List<GlobalPropertiesPair> 
createPossibleGlobalProperties() {
+                       RequestedGlobalProperties partitionedGp = new 
RequestedGlobalProperties();
+                       partitionedGp.setHashPartitioned(this.keys1);
+                       return Collections.singletonList(new 
GlobalPropertiesPair(partitionedGp, new RequestedGlobalProperties()));
+               }
+
+               @Override
+               protected List<LocalPropertiesPair> 
createPossibleLocalProperties() {
+                       // all properties are possible
+                       return Collections.singletonList(new 
LocalPropertiesPair(
+                               new RequestedLocalProperties(), new 
RequestedLocalProperties()));
+               }
+               
+               @Override
+               public boolean areCompatible(RequestedGlobalProperties 
requested1, RequestedGlobalProperties requested2,
+                               GlobalProperties produced1, GlobalProperties 
produced2) {
+                       return true;
+               }
+               
+               @Override
+               public boolean areCoFulfilled(RequestedLocalProperties 
requested1, RequestedLocalProperties requested2,
+                               LocalProperties produced1, LocalProperties 
produced2) {
+                       return true;
+               }
+
+               @Override
+               public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public GlobalProperties 
computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public LocalProperties computeLocalProperties(LocalProperties 
in1, LocalProperties in2) {
+                       throw new UnsupportedOperationException();
+               }
+       }
+       
+       public static class SingleRootJoiner extends TwoInputNode {
+               
+               SingleRootJoiner() {
+                       super(new NoOpBinaryUdfOp<Nothing>(new 
NothingTypeInfo()));
+                       
+                       setDegreeOfParallelism(1);
+               }
+               
+               public void setInputs(DagConnection input1, DagConnection 
input2) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+               }
+               
+               @Override
+               public String getName() {
+                       return "Internal Utility Node";
+               }
+
+               @Override
+               protected List<OperatorDescriptorDual> getPossibleProperties() {
+                       return Collections.emptyList();
+               }
+
+               @Override
+               protected void 
computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+                       // no estimates are needed here
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
new file mode 100644
index 0000000..3b05aba
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import 
org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+
+/**
+ * The optimizer's internal representation of the partial solution that is 
input to a bulk iteration.
+ */
+public class WorksetNode extends AbstractPartialSolutionNode {
+       
+       private final WorksetIterationNode iterationNode;
+       
+       
+       public WorksetNode(WorksetPlaceHolder<?> psph, WorksetIterationNode 
iterationNode) {
+               super(psph);
+               this.iterationNode = iterationNode;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       public void setCandidateProperties(GlobalProperties gProps, 
LocalProperties lProps, Channel initialInput) {
+               if (this.cachedPlans != null) {
+                       throw new IllegalStateException();
+               } else {
+                       WorksetPlanNode wspn = new WorksetPlanNode(this, 
"Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput);
+                       this.cachedPlans = 
Collections.<PlanNode>singletonList(wspn);
+               }
+       }
+       
+       public WorksetPlanNode getCurrentWorksetPlanNode() {
+               if (this.cachedPlans != null) {
+                       return (WorksetPlanNode) this.cachedPlans.get(0);
+               } else {
+                       throw new IllegalStateException();
+               }
+       }
+       
+       public WorksetIterationNode getIterationNode() {
+               return this.iterationNode;
+       }
+       
+       @Override
+       public void computeOutputEstimates(DataStatistics statistics) {
+               
copyEstimates(this.iterationNode.getInitialWorksetPredecessorNode());
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Gets the contract object for this data source node.
+        * 
+        * @return The contract.
+        */
+       @Override
+       public WorksetPlaceHolder<?> getOperator() {
+               return (WorksetPlaceHolder<?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Workset";
+       }
+       
+       @Override
+       public void computeUnclosedBranchStack() {
+               if (this.openBranches != null) {
+                       return;
+               }
+
+               DagConnection worksetInput = 
this.iterationNode.getSecondIncomingConnection();
+               OptimizerNode worksetSource = worksetInput.getSource();
+               
+               addClosedBranches(worksetSource.closedBranchingNodes);
+               List<UnclosedBranchDescriptor> fromInput = 
worksetSource.getBranchesForParent(worksetInput);
+               this.openBranches = (fromInput == null || fromInput.isEmpty()) 
? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
new file mode 100644
index 0000000..57ba29d
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class represents global properties of the data at a certain point in 
the plan.
+ * Global properties are properties that describe data across different 
partitions, such as
+ * whether the data is hash partitioned, range partitioned, replicated, etc.
+ */
+public class GlobalProperties implements Cloneable {
+
+       public static final Logger LOG = 
LoggerFactory.getLogger(GlobalProperties.class);
+       
+       private PartitioningProperty partitioning;      // the type partitioning
+       
+       private FieldList partitioningFields;           // the fields which are 
partitioned
+       
+       private Ordering ordering;                                      // 
order of the partitioned fields, if it is an ordered (range) range partitioning
+       
+       private Set<FieldSet> uniqueFieldCombinations;
+       
+       private Partitioner<?> customPartitioner;
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Initializes the global properties with no partitioning.
+        */
+       public GlobalProperties() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Sets this global properties to represent a hash partitioning.
+        * 
+        * @param partitionedFields The key fields on which the data is hash 
partitioned.
+        */
+       public void setHashPartitioned(FieldList partitionedFields) {
+               if (partitionedFields == null) {
+                       throw new NullPointerException();
+               }
+               
+               this.partitioning = PartitioningProperty.HASH_PARTITIONED;
+               this.partitioningFields = partitionedFields;
+               this.ordering = null;
+       }
+       
+
+       public void setRangePartitioned(Ordering ordering) {
+               if (ordering == null) {
+                       throw new NullPointerException();
+               }
+               
+               this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
+               this.ordering = ordering;
+               this.partitioningFields = ordering.getInvolvedIndexes();
+       }
+       
+       public void setAnyPartitioning(FieldList partitionedFields) {
+               if (partitionedFields == null) {
+                       throw new NullPointerException();
+               }
+               
+               this.partitioning = PartitioningProperty.ANY_PARTITIONING;
+               this.partitioningFields = partitionedFields;
+               this.ordering = null;
+       }
+       
+       public void setRandomPartitioned() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+       
+       public void setFullyReplicated() {
+               this.partitioning = PartitioningProperty.FULL_REPLICATION;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+       
+       public void setForcedRebalanced() {
+               this.partitioning = PartitioningProperty.FORCED_REBALANCED;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+       
+       public void setCustomPartitioned(FieldList partitionedFields, 
Partitioner<?> partitioner) {
+               if (partitionedFields == null || partitioner == null) {
+                       throw new NullPointerException();
+               }
+               
+               this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
+               this.partitioningFields = partitionedFields;
+               this.ordering = null;
+               this.customPartitioner = partitioner;
+       }
+       
+       public void addUniqueFieldCombination(FieldSet fields) {
+               if (fields == null) {
+                       return;
+               }
+               if (this.uniqueFieldCombinations == null) {
+                       this.uniqueFieldCombinations = new HashSet<FieldSet>();
+               }
+               this.uniqueFieldCombinations.add(fields);
+       }
+       
+       public void clearUniqueFieldCombinations() {
+               if (this.uniqueFieldCombinations != null) {
+                       this.uniqueFieldCombinations = null;
+               }
+       }
+       
+       public Set<FieldSet> getUniqueFieldCombination() {
+               return this.uniqueFieldCombinations;
+       }
+       
+       public FieldList getPartitioningFields() {
+               return this.partitioningFields;
+       }
+       
+       public Ordering getPartitioningOrdering() {
+               return this.ordering;
+       }
+       
+       public PartitioningProperty getPartitioning() {
+               return this.partitioning;
+       }
+       
+       public Partitioner<?> getCustomPartitioner() {
+               return this.customPartitioner;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public boolean isPartitionedOnFields(FieldSet fields) {
+               if (this.partitioning.isPartitionedOnKey() && 
fields.isValidSubset(this.partitioningFields)) {
+                       return true;
+               } else if (this.uniqueFieldCombinations != null) {
+                       for (FieldSet set : this.uniqueFieldCombinations) {
+                               if (fields.isValidSubset(set)) {
+                                       return true;
+                               }
+                       }
+                       return false;
+               } else {
+                       return false;
+               }
+       }
+
+       public boolean isExactlyPartitionedOnFields(FieldList fields) {
+               return this.partitioning.isPartitionedOnKey() && 
fields.isExactMatch(this.partitioningFields);
+       }
+       
+       public boolean matchesOrderedPartitioning(Ordering o) {
+               if (this.partitioning == 
PartitioningProperty.RANGE_PARTITIONED) {
+                       if (this.ordering.getNumberOfFields() > 
o.getNumberOfFields()) {
+                               return false;
+                       }
+                       
+                       for (int i = 0; i < this.ordering.getNumberOfFields(); 
i++) {
+                               if (this.ordering.getFieldNumber(i) != 
o.getFieldNumber(i)) {
+                                       return false;
+                               }
+                               
+                               // if this one request no order, everything is 
good
+                               final Order oo = o.getOrder(i);
+                               final Order to = this.ordering.getOrder(i);
+                               if (oo != Order.NONE) {
+                                       if (oo == Order.ANY) {
+                                               // if any order is requested, 
any not NONE order is good
+                                               if (to == Order.NONE) {
+                                                       return false;
+                                               }
+                                       } else if (oo != to) {
+                                               // the orders must be equal
+                                               return false;
+                                       }
+                               }
+                       }
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       public boolean isFullyReplicated() {
+               return this.partitioning == 
PartitioningProperty.FULL_REPLICATION;
+       }
+
+       /**
+        * Checks, if the properties in this object are trivial, i.e. only 
standard values.
+        */
+       public boolean isTrivial() {
+               return partitioning == PartitioningProperty.RANDOM_PARTITIONED;
+       }
+
+       /**
+        * This method resets the properties to a state where no properties are 
given.
+        */
+       public void reset() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+               this.ordering = null;
+               this.partitioningFields = null;
+       }
+
+       /**
+        * Filters these GlobalProperties by the fields that are forwarded to 
the output
+        * as described by the SemanticProperties.
+        *
+        * @param props The semantic properties holding information about 
forwarded fields.
+        * @param input The index of the input.
+        * @return The filtered GlobalProperties
+        */
+       public GlobalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
+
+               if (props == null) {
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
+               }
+
+               GlobalProperties gp = new GlobalProperties();
+
+               // filter partitioning
+               switch(this.partitioning) {
+                       case RANGE_PARTITIONED:
+                               // check if ordering is preserved
+                               Ordering newOrdering = new Ordering();
+                               for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                                       int sourceField = 
this.ordering.getInvolvedIndexes().get(i);
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               // partitioning is destroyed
+                                               newOrdering = null;
+                                               break;
+                                       } else {
+                                               // use any field of target 
fields for now. We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               
newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), 
this.ordering.getOrder(i));
+                                       }
+                               }
+                               if(newOrdering != null) {
+                                       gp.partitioning = 
PartitioningProperty.RANGE_PARTITIONED;
+                                       gp.ordering = newOrdering;
+                                       gp.partitioningFields = 
newOrdering.getInvolvedIndexes();
+                               }
+                               break;
+                       case HASH_PARTITIONED:
+                       case ANY_PARTITIONING:
+                       case CUSTOM_PARTITIONING:
+                               FieldList newPartitioningFields = new 
FieldList();
+                               for (int sourceField : this.partitioningFields) 
{
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               newPartitioningFields = null;
+                                               break;
+                                       } else {
+                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               newPartitioningFields = 
newPartitioningFields.addField(targetField.toArray()[0]);
+                                       }
+                               }
+                               if(newPartitioningFields != null) {
+                                       gp.partitioning = this.partitioning;
+                                       gp.partitioningFields = 
newPartitioningFields;
+                                       gp.customPartitioner = 
this.customPartitioner;
+                               }
+                               break;
+                       case FORCED_REBALANCED:
+                       case FULL_REPLICATION:
+                       case RANDOM_PARTITIONED:
+                               gp.partitioning = this.partitioning;
+                               break;
+                       default:
+                               throw new RuntimeException("Unknown 
partitioning type.");
+               }
+
+               // filter unique field combinations
+               if (this.uniqueFieldCombinations != null) {
+                       Set<FieldSet> newUniqueFieldCombinations = new 
HashSet<FieldSet>();
+                       for (FieldSet fieldCombo : 
this.uniqueFieldCombinations) {
+                               FieldSet newFieldCombo = new FieldSet();
+                               for (Integer sourceField : fieldCombo) {
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               newFieldCombo = null;
+                                               break;
+                                       } else {
+                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               newFieldCombo = 
newFieldCombo.addField(targetField.toArray()[0]);
+                                       }
+                               }
+                               if (newFieldCombo != null) {
+                                       
newUniqueFieldCombinations.add(newFieldCombo);
+                               }
+                       }
+                       if(!newUniqueFieldCombinations.isEmpty()) {
+                               gp.uniqueFieldCombinations = 
newUniqueFieldCombinations;
+                       }
+               }
+
+               return gp;
+       }
+
+
+       public void parameterizeChannel(Channel channel, boolean 
globalDopChange,
+                                                                       
ExecutionMode exchangeMode, boolean breakPipeline) {
+
+               ShipStrategyType shipType;
+               FieldList partitionKeys;
+               boolean[] sortDirection;
+               Partitioner<?> partitioner;
+
+               switch (this.partitioning) {
+                       case RANDOM_PARTITIONED:
+                               shipType = globalDopChange ? 
ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD;
+                               partitionKeys = null;
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+
+                       case FULL_REPLICATION:
+                               shipType = ShipStrategyType.BROADCAST;
+                               partitionKeys = null;
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+
+                       case ANY_PARTITIONING:
+                       case HASH_PARTITIONED:
+                               shipType = ShipStrategyType.PARTITION_HASH;
+                               partitionKeys = 
Utils.createOrderedFromSet(this.partitioningFields);
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+
+                       case RANGE_PARTITIONED:
+                               shipType = ShipStrategyType.PARTITION_RANGE;
+                               partitionKeys = 
this.ordering.getInvolvedIndexes();
+                               sortDirection = 
this.ordering.getFieldSortDirections();
+                               partitioner = null;
+                               break;
+
+                       case FORCED_REBALANCED:
+                               shipType = ShipStrategyType.PARTITION_RANDOM;
+                               partitionKeys = null;
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+
+                       case CUSTOM_PARTITIONING:
+                               shipType = ShipStrategyType.PARTITION_CUSTOM;
+                               partitionKeys = this.partitioningFields;
+                               sortDirection = null;
+                               partitioner = this.customPartitioner;
+                               break;
+
+                       default:
+                               throw new CompilerException("Unsupported 
partitioning strategy");
+               }
+
+               DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, 
shipType, breakPipeline);
+               channel.setShipStrategy(shipType, partitionKeys, sortDirection, 
partitioner, exMode);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = prime * result + ((partitioning == null) ? 0 : 
partitioning.ordinal());
+               result = prime * result + ((partitioningFields == null) ? 0 : 
partitioningFields.hashCode());
+               result = prime * result + ((ordering == null) ? 0 : 
ordering.hashCode());
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj instanceof GlobalProperties) {
+                       final GlobalProperties other = (GlobalProperties) obj;
+                       return (this.partitioning == other.partitioning)
+                               && (this.ordering == other.ordering || 
(this.ordering != null && this.ordering.equals(other.ordering)))
+                               && (this.partitioningFields == 
other.partitioningFields || 
+                                                       
(this.partitioningFields != null && 
this.partitioningFields.equals(other.partitioningFields)))
+                               && (this.uniqueFieldCombinations == 
other.uniqueFieldCombinations || 
+                                                       
(this.uniqueFieldCombinations != null && 
this.uniqueFieldCombinations.equals(other.uniqueFieldCombinations)));
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               final StringBuilder bld = new StringBuilder(
+                       "GlobalProperties [partitioning=" + partitioning + 
+                       (this.partitioningFields == null ? "" : ", on fields " 
+ this.partitioningFields) + 
+                       (this.ordering == null ? "" : ", with ordering " + 
this.ordering));
+               
+               if (this.uniqueFieldCombinations == null) {
+                       bld.append(']');
+               } else {
+                       bld.append(" - Unique field groups: ");
+                       bld.append(this.uniqueFieldCombinations);
+                       bld.append(']');
+               }
+               return bld.toString();
+       }
+
+       @Override
+       public GlobalProperties clone() {
+               final GlobalProperties newProps = new GlobalProperties();
+               newProps.partitioning = this.partitioning;
+               newProps.partitioningFields = this.partitioningFields;
+               newProps.ordering = this.ordering;
+               newProps.customPartitioner = this.customPartitioner;
+               newProps.uniqueFieldCombinations = this.uniqueFieldCombinations 
== null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
+               return newProps;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static GlobalProperties combine(GlobalProperties gp1, 
GlobalProperties gp2) {
+               if (gp1.isFullyReplicated()) {
+                       if (gp2.isFullyReplicated()) {
+                               return new GlobalProperties();
+                       } else {
+                               return gp2;
+                       }
+               } else if (gp2.isFullyReplicated()) {
+                       return gp1;
+               } else if (gp1.ordering != null) {
+                       return gp1;
+               } else if (gp2.ordering != null) {
+                       return gp2;
+               } else if (gp1.partitioningFields != null) {
+                       return gp1;
+               } else if (gp2.partitioningFields != null) {
+                       return gp2;
+               } else if (gp1.uniqueFieldCombinations != null) {
+                       return gp1;
+               } else if (gp2.uniqueFieldCombinations != null) {
+                       return gp2;
+               } else if (gp1.getPartitioning().isPartitioned()) {
+                       return gp1;
+               } else if (gp2.getPartitioning().isPartitioned()) {
+                       return gp2;
+               } else {
+                       return gp1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
new file mode 100644
index 0000000..6946641
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+
+/**
+ * Interesting properties are propagated from parent operators to child 
operators. They tell the child
+ * what data properties would help the parent in operating in a cheaper 
fashion. A reduce operator, for
+ * example, tells its child that partitioned data would help. If the child is 
a join operator, it can use
+ * that knowledge to favor strategies that leave the data in a partitioned 
form.
+ *
+ * More on optimization with interesting properties can be found in the works 
on
+ * the volcano- and cascades optimizer framework.
+ */
+public class InterestingProperties implements Cloneable  {
+
+       private Set<RequestedGlobalProperties> globalProps; // the global 
properties, i.e. properties across partitions
+
+       private Set<RequestedLocalProperties> localProps; // the local 
properties, i.e. properties within partitions
+
+       // 
------------------------------------------------------------------------
+
+       public InterestingProperties() {
+               this.globalProps = new HashSet<RequestedGlobalProperties>();
+               this.localProps = new HashSet<RequestedLocalProperties>();
+       }
+
+       /**
+        * Private constructor for cloning purposes.
+        * 
+        * @param globalProps  The global properties for this new object.
+        * @param localProps The local properties for this new object.
+        */
+       private InterestingProperties(Set<RequestedGlobalProperties> 
globalProps, Set<RequestedLocalProperties> localProps) {
+               this.globalProps = globalProps;
+               this.localProps = localProps;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public void addGlobalProperties(RequestedGlobalProperties props) {
+               this.globalProps.add(props);
+       }
+       
+       public void addLocalProperties(RequestedLocalProperties props) {
+               this.localProps.add(props);
+       }
+       
+       public void addInterestingProperties(InterestingProperties other) {
+               this.globalProps.addAll(other.globalProps);
+               this.localProps.addAll(other.localProps);
+       }
+
+       /**
+        * Gets the interesting local properties.
+        * 
+        * @return The interesting local properties.
+        */
+       public Set<RequestedLocalProperties> getLocalProperties() {
+               return this.localProps;
+       }
+
+       /**
+        * Gets the interesting global properties.
+        * 
+        * @return The interesting global properties.
+        */
+       public Set<RequestedGlobalProperties> getGlobalProperties() {
+               return this.globalProps;
+       }
+
+       public InterestingProperties filterByCodeAnnotations(OptimizerNode 
node, int input) {
+               InterestingProperties iProps = new InterestingProperties();
+               SemanticProperties props;
+               if (node instanceof SingleInputNode || node instanceof 
TwoInputNode) {
+                       props = node.getSemanticProperties();
+               } else {
+                       props = new 
SemanticProperties.EmptySemanticProperties();
+               }
+
+               for (RequestedGlobalProperties rgp : this.globalProps) {
+                       RequestedGlobalProperties filtered = 
rgp.filterBySemanticProperties(props, input);
+                       if (filtered != null && !filtered.isTrivial()) {
+                               iProps.addGlobalProperties(filtered);
+                       }
+               }
+               for (RequestedLocalProperties rlp : this.localProps) {
+                       RequestedLocalProperties filtered = 
rlp.filterBySemanticProperties(props, input);
+                       if (filtered != null && !filtered.isTrivial()) {
+                               iProps.addLocalProperties(filtered);
+                       }
+               }
+               return iProps;
+       }
+       
+       public void dropTrivials() {
+               for (Iterator<RequestedGlobalProperties> iter = 
this.globalProps.iterator(); iter.hasNext();) {
+                       RequestedGlobalProperties gp = iter.next();
+                       if (gp.isTrivial()) {
+                               iter.remove();
+                               break;
+                       }
+               }
+               
+               for (Iterator<RequestedLocalProperties> iter = 
this.localProps.iterator(); iter.hasNext();) {
+                       RequestedLocalProperties lp = iter.next();
+                       if (lp.isTrivial()) {
+                               iter.remove();
+                               break;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = prime * result + ((globalProps == null) ? 0 : 
globalProps.hashCode());
+               result = prime * result + ((localProps == null) ? 0 : 
localProps.hashCode());
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj instanceof InterestingProperties) {
+                       InterestingProperties other = (InterestingProperties) 
obj;
+                       return this.globalProps.equals(other.globalProps) &&
+                                       
this.localProps.equals(other.localProps);
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "InterestingProperties [globalProps=" + this.globalProps 
+ 
+                               ", localProps=" + this.localProps + " ]";
+       }
+
+       @Override
+       public InterestingProperties clone() {
+               HashSet<RequestedGlobalProperties> globalProps = new 
HashSet<RequestedGlobalProperties>();
+               for (RequestedGlobalProperties p : this.globalProps) {
+                       globalProps.add(p.clone());
+               }
+               HashSet<RequestedLocalProperties> localProps = new 
HashSet<RequestedLocalProperties>();
+               for (RequestedLocalProperties p : this.localProps) {
+                       localProps.add(p.clone());
+               }
+               
+               return new InterestingProperties(globalProps, localProps);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
new file mode 100644
index 0000000..e0231aa
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dataproperties;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class represents local properties of the data. A local property is a 
property that exists
+ * within the data of a single partition, such as sort order, or data grouping.
+ */
+public class LocalProperties implements Cloneable {
+
+       public static final Logger LOG = 
LoggerFactory.getLogger(GlobalProperties.class);
+
+       public static final LocalProperties EMPTY = new LocalProperties();
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private Ordering ordering;                      // order inside a 
partition, null if not ordered
+
+       private FieldList groupedFields;                // fields by which the 
stream is grouped. null if not grouped.
+       
+       private Set<FieldSet> uniqueFields;             // fields whose value 
combination is unique in the stream
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Default constructor for trivial local properties. No order, no 
grouping, no uniqueness.
+        */
+       public LocalProperties() {}
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Gets the key order.
+        * 
+        * @return The key order, or <code>null</code> if nothing is ordered.
+        */
+       public Ordering getOrdering() {
+               return ordering;
+       }
+       
+       /**
+        * Gets the grouped fields.
+        * 
+        * @return The grouped fields, or <code>null</code> if nothing is 
grouped.
+        */
+       public FieldList getGroupedFields() {
+               return this.groupedFields;
+       }
+
+       /**
+        * Gets the fields whose combination is unique within the data set.
+        * 
+        * @return The unique field combination, or <code>null</code> if 
nothing is unique.
+        */
+       public Set<FieldSet> getUniqueFields() {
+               return this.uniqueFields;
+       }
+       
+       /**
+        * Checks whether the given set of fields is unique, as specified in 
these local properties.
+        * 
+        * @param set The set to check.
+        * @return True, if the given column combination is unique, false if 
not.
+        */
+       public boolean areFieldsUnique(FieldSet set) {
+               return this.uniqueFields != null && 
this.uniqueFields.contains(set);
+       }
+       
+       /**
+        * Adds a combination of fields that are unique in these data 
properties.
+        * 
+        * @param uniqueFields The fields that are unique in these data 
properties.
+        */
+       public LocalProperties addUniqueFields(FieldSet uniqueFields) {
+               LocalProperties copy = clone();
+               
+               if (copy.uniqueFields == null) {
+                       copy.uniqueFields = new HashSet<FieldSet>();
+               }
+               copy.uniqueFields.add(uniqueFields);
+               return copy;
+       }
+       
+       public LocalProperties clearUniqueFieldSets() {
+               if (this.uniqueFields == null || this.uniqueFields.isEmpty()) {
+                       return this;
+               } else {
+                       LocalProperties copy = new LocalProperties();
+                       copy.ordering = this.ordering;
+                       copy.groupedFields = this.groupedFields;
+                       return copy;
+               }
+       }
+       
+       /**
+        * Checks, if the properties in this object are trivial, i.e. only 
standard values.
+        */
+       public boolean isTrivial() {
+               return ordering == null && this.groupedFields == null && 
this.uniqueFields == null;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Filters these LocalProperties by the fields that are forwarded to 
the output
+        * as described by the SemanticProperties.
+        *
+        * @param props The semantic properties holding information about 
forwarded fields.
+        * @param input The index of the input.
+        * @return The filtered LocalProperties
+        */
+       public LocalProperties filterBySemanticProperties(SemanticProperties 
props, int input) {
+
+               if (props == null) {
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
+               }
+
+               LocalProperties returnProps = new LocalProperties();
+
+               // check if sorting is preserved
+               if (this.ordering != null) {
+                       Ordering newOrdering = new Ordering();
+
+                       for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                               int sourceField = 
this.ordering.getInvolvedIndexes().get(i);
+                               FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+                               if (targetField == null || targetField.size() 
== 0) {
+                                       if (i == 0) {
+                                               // order fully destroyed
+                                               newOrdering = null;
+                                               break;
+                                       } else {
+                                               // order partially preserved
+                                               break;
+                                       }
+                               } else {
+                                       // use any field of target fields for 
now.  We should use something like field equivalence sets in the future.
+                                       if(targetField.size() > 1) {
+                                               LOG.warn("Found that a field is 
forwarded to more than one target field in " +
+                                                               "semantic 
forwarded field information. Will only use the field with the lowest index.");
+                                       }
+                                       
newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), 
this.ordering.getOrder(i));
+                               }
+                       }
+
+                       returnProps.ordering = newOrdering;
+                       if (newOrdering != null) {
+                               returnProps.groupedFields = 
newOrdering.getInvolvedIndexes();
+                       } else {
+                               returnProps.groupedFields = null;
+                       }
+               }
+               // check if grouping is preserved
+               else if (this.groupedFields != null) {
+                       FieldList newGroupedFields = new FieldList();
+
+                       for (Integer sourceField : this.groupedFields) {
+                               FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+                               if (targetField == null || targetField.size() 
== 0) {
+                                       newGroupedFields = null;
+                                       break;
+                               } else {
+                                       // use any field of target fields for 
now.  We should use something like field equivalence sets in the future.
+                                       if(targetField.size() > 1) {
+                                               LOG.warn("Found that a field is 
forwarded to more than one target field in " +
+                                                               "semantic 
forwarded field information. Will only use the field with the lowest index.");
+                                       }
+                                       newGroupedFields = 
newGroupedFields.addField(targetField.toArray()[0]);
+                               }
+                       }
+                       returnProps.groupedFields = newGroupedFields;
+               }
+
+               if (this.uniqueFields != null) {
+                       Set<FieldSet> newUniqueFields = new HashSet<FieldSet>();
+                       for (FieldSet fields : this.uniqueFields) {
+                               FieldSet newFields = new FieldSet();
+                               for (Integer sourceField : fields) {
+                                       FieldSet targetField = 
props.getForwardingTargetFields(input, sourceField);
+
+                                       if (targetField == null || 
targetField.size() == 0) {
+                                               newFields = null;
+                                               break;
+                                       } else {
+                                               // use any field of target 
fields for now.  We should use something like field equivalence sets in the 
future.
+                                               if(targetField.size() > 1) {
+                                                       LOG.warn("Found that a 
field is forwarded to more than one target field in " +
+                                                                       
"semantic forwarded field information. Will only use the field with the lowest 
index.");
+                                               }
+                                               newFields = 
newFields.addField(targetField.toArray()[0]);
+                                       }
+                               }
+                               if (newFields != null) {
+                                       newUniqueFields.add(newFields);
+                               }
+                       }
+
+                       if (!newUniqueFields.isEmpty()) {
+                               returnProps.uniqueFields = newUniqueFields;
+                       } else {
+                               returnProps.uniqueFields = null;
+                       }
+               }
+
+               return returnProps;
+       }
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = prime * result + (this.ordering == null ? 0 : 
this.ordering.hashCode());
+               result = prime * result + (this.groupedFields == null ? 0 : 
this.groupedFields.hashCode());
+               result = prime * result + (this.uniqueFields == null ? 0 : 
this.uniqueFields.hashCode());
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj instanceof LocalProperties) {
+                       final LocalProperties other = (LocalProperties) obj;
+                       return (ordering == other.getOrdering() || (ordering != 
null && ordering.equals(other.getOrdering()))) &&
+                               (groupedFields == other.getGroupedFields() || 
(groupedFields != null && groupedFields.equals(other.getGroupedFields()))) &&
+                               (uniqueFields == other.getUniqueFields() || 
(uniqueFields != null && uniqueFields.equals(other.getUniqueFields())));
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "LocalProperties [ordering=" + this.ordering + ", 
grouped=" + this.groupedFields
+                       + ", unique=" + this.uniqueFields + "]";
+       }
+
+       @Override
+       public LocalProperties clone() {
+               LocalProperties copy = new LocalProperties();
+               copy.ordering = this.ordering;
+               copy.groupedFields = this.groupedFields;
+               copy.uniqueFields = (this.uniqueFields == null ? null : new 
HashSet<FieldSet>(this.uniqueFields));
+               return copy;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static LocalProperties combine(LocalProperties lp1, 
LocalProperties lp2) {
+               if (lp1.ordering != null) {
+                       return lp1;
+               } else if (lp2.ordering != null) {
+                       return lp2;
+               } else if (lp1.groupedFields != null) {
+                       return lp1;
+               } else if (lp2.groupedFields != null) {
+                       return lp2;
+               } else if (lp1.uniqueFields != null && 
!lp1.uniqueFields.isEmpty()) {
+                       return lp1;
+               } else if (lp2.uniqueFields != null && 
!lp2.uniqueFields.isEmpty()) {
+                       return lp2;
+               } else {
+                       return lp1;
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static LocalProperties forOrdering(Ordering o) {
+               LocalProperties props = new LocalProperties();
+               props.ordering = o;
+               props.groupedFields = o.getInvolvedIndexes();
+               return props;
+       }
+       
+       public static LocalProperties forGrouping(FieldList groupedFields) {
+               LocalProperties props = new LocalProperties();
+               props.groupedFields = groupedFields;
+               return props;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
new file mode 100644
index 0000000..5e06dd3
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+/**
+ * An enumeration of the the different types of distributing data across 
partitions or
+ * parallel workers.
+ */
+public enum PartitioningProperty {
+
+       /**
+        * Any possible way of data distribution, including random partitioning 
and full replication.
+        */
+       ANY_DISTRIBUTION,
+
+       /**
+        * A random disjunct (non-replicated) data distribution, where each 
datum is contained in one partition only.
+        * This is for example the result of parallel scans of data in a file 
system like HDFS,
+        * or the result of a round-robin data distribution.
+        */
+       RANDOM_PARTITIONED,
+
+       /**
+        * A hash partitioning on a certain key.
+        */
+       HASH_PARTITIONED,
+
+       /**
+        * A range partitioning on a certain key.
+        */
+       RANGE_PARTITIONED,
+
+       /**
+        * A not further specified partitioning on a key (hash-, or range 
partitioning, or some other scheme even).
+        */
+       ANY_PARTITIONING,
+       
+       /**
+        *Full replication of the data to each parallel instance.
+        */
+       FULL_REPLICATION,
+
+       /**
+        * A forced even re-balancing. All partitions are guaranteed to have 
almost the same number of records.
+        */
+       FORCED_REBALANCED,
+       
+       /**
+        * A custom partitioning, accompanied by a {@link 
org.apache.flink.api.common.functions.Partitioner}.
+        */
+       CUSTOM_PARTITIONING;
+       
+       /**
+        * Checks, if this property represents in fact a partitioning. That is,
+        * whether this property is not equal to 
<tt>PartitionProperty.FULL_REPLICATION</tt>.
+        * 
+        * @return True, if this enum constant is unequal to 
<tt>PartitionProperty.FULL_REPLICATION</tt>,
+        *         false otherwise.
+        */
+       public boolean isPartitioned() {
+               return this != FULL_REPLICATION && this != FORCED_REBALANCED && 
this != ANY_DISTRIBUTION;
+       }
+       
+       /**
+        * Checks, if this property represents a full replication.
+        * 
+        * @return True, if this enum constant is equal to 
<tt>PartitionProperty.FULL_REPLICATION</tt>,
+        *         false otherwise.
+        */
+       public boolean isReplication() {
+               return this == FULL_REPLICATION;
+       }
+       
+       /**
+        * Checks if this property presents a partitioning that is not random, 
but on a partitioning key.
+        * 
+        * @return True, if the data is partitioned on a key.
+        */
+       public boolean isPartitionedOnKey() {
+               return isPartitioned() && this != RANDOM_PARTITIONED;
+       }
+
+       /**
+        * Checks, if this property represents a partitioning that is 
computable.
+        * A computable partitioning can be recreated through an algorithm. If 
two sets of data are to
+        * be co-partitioned, it is crucial, that the partitioning schemes are 
computable.
+        * <p>
+        * Examples for computable partitioning schemes are hash- or 
range-partitioning. An example for a non-computable
+        * partitioning is the implicit partitioning that exists though a 
globally unique key.
+        * 
+        * @return True, if this enum constant is a re-computable partitioning.
+        */
+       public boolean isComputablyPartitioned() {
+               return this == HASH_PARTITIONED || this == RANGE_PARTITIONED || 
this == CUSTOM_PARTITIONING;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
new file mode 100644
index 0000000..8c3f6bd
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dataproperties;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * This class represents the global properties of the data that are requested 
by an operator.
+ * Operators request the global properties they need for correct execution. 
This list is an example of global
+ * properties requested by certain operators:
+ * <ul>
+ *     <li>"groupBy/reduce" will request the data to be partitioned in some 
way after the key fields.</li>
+ *     <li>"map" will request the data to be in an arbitrary distribution - it 
has no prerequisites</li>
+ *     <li>"join" will request certain properties for each input. This class 
represents the properties
+ *         on an input alone. The properties may be partitioning on the key 
fields, or a combination of
+ *         replication on one input and anything-but-replication on the other 
input.</li>
+ * </ul>
+ */
+public final class RequestedGlobalProperties implements Cloneable {
+       
+       private PartitioningProperty partitioning;      // the type partitioning
+       
+       private FieldSet partitioningFields;            // the fields which are 
partitioned
+       
+       private Ordering ordering;                                      // 
order of the partitioned fields, if it is an ordered (range) range partitioning
+       
+       private DataDistribution dataDistribution;      // optional data 
distribution, for a range partitioning
+       
+       private Partitioner<?> customPartitioner;       // optional, 
partitioner for custom partitioning
+
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Initializes the global properties with no partitioning.
+        */
+       public RequestedGlobalProperties() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Sets these properties to request a hash partitioning on the given 
fields.
+        *
+        * If the fields are provided as {@link FieldSet}, then any permutation 
of the fields is a
+        * valid partitioning, including subsets. If the fields are given as a 
{@link FieldList},
+        * then only an exact partitioning on the fields matches this requested 
partitioning.
+        *
+        * @param partitionedFields The key fields for the partitioning.
+        */
+       public void setHashPartitioned(FieldSet partitionedFields) {
+               if (partitionedFields == null) {
+                       throw new NullPointerException();
+               }
+               this.partitioning = PartitioningProperty.HASH_PARTITIONED;
+               this.partitioningFields = partitionedFields;
+               this.ordering = null;
+       }
+       
+
+       public void setRangePartitioned(Ordering ordering) {
+               this.setRangePartitioned(ordering, null);
+       }
+       
+       public void setRangePartitioned(Ordering ordering, DataDistribution 
dataDistribution) {
+               if (ordering == null) {
+                       throw new NullPointerException();
+               }
+               this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
+               this.ordering = ordering;
+               this.partitioningFields = null;
+               this.dataDistribution = dataDistribution;
+       }
+
+       /**
+        * Sets these properties to request some partitioning on the given 
fields. This will allow
+        * both hash partitioning and range partitioning to match.
+        *
+        * If the fields are provided as {@link FieldSet}, then any permutation 
of the fields is a
+        * valid partitioning, including subsets. If the fields are given as a 
{@link FieldList},
+        * then only an exact partitioning on the fields matches this requested 
partitioning.
+        *
+        * @param partitionedFields The key fields for the partitioning.
+        */
+       public void setAnyPartitioning(FieldSet partitionedFields) {
+               if (partitionedFields == null) {
+                       throw new NullPointerException();
+               }
+               this.partitioning = PartitioningProperty.ANY_PARTITIONING;
+               this.partitioningFields = partitionedFields;
+               this.ordering = null;
+       }
+       
+       public void setRandomPartitioning() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+
+       public void setAnyDistribution() {
+               this.partitioning = PartitioningProperty.ANY_DISTRIBUTION;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+       
+       public void setFullyReplicated() {
+               this.partitioning = PartitioningProperty.FULL_REPLICATION;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+       
+       public void setForceRebalancing() {
+               this.partitioning = PartitioningProperty.FORCED_REBALANCED;
+               this.partitioningFields = null;
+               this.ordering = null;
+       }
+
+       /**
+        * Sets these properties to request a custom partitioning with the 
given {@link Partitioner} instance.
+        *
+        * If the fields are provided as {@link FieldSet}, then any permutation 
of the fields is a
+        * valid partitioning, including subsets. If the fields are given as a 
{@link FieldList},
+        * then only an exact partitioning on the fields matches this requested 
partitioning.
+        *
+        * @param partitionedFields The key fields for the partitioning.
+        */
+       public void setCustomPartitioned(FieldSet partitionedFields, 
Partitioner<?> partitioner) {
+               if (partitionedFields == null || partitioner == null) {
+                       throw new NullPointerException();
+               }
+               
+               this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
+               this.partitioningFields = partitionedFields;
+               this.ordering = null;
+               this.customPartitioner = partitioner;
+       }
+
+       /**
+        * Gets the partitioning property.
+        * 
+        * @return The partitioning property.
+        */
+       public PartitioningProperty getPartitioning() {
+               return partitioning;
+       }
+       
+       /**
+        * Gets the fields on which the data is partitioned.
+        * 
+        * @return The partitioning fields.
+        */
+       public FieldSet getPartitionedFields() {
+               return this.partitioningFields;
+       }
+
+       /**
+        * Gets the key order.
+        * 
+        * @return The key order.
+        */
+       public Ordering getOrdering() {
+               return this.ordering;
+       }
+       
+       /**
+        * Gets the data distribution.
+        * 
+        * @return The data distribution.
+        */
+       public DataDistribution getDataDistribution() {
+               return this.dataDistribution;
+       }
+       
+       /**
+        * Gets the custom partitioner associated with these properties.
+        * 
+        * @return The custom partitioner associated with these properties.
+        */
+       public Partitioner<?> getCustomPartitioner() {
+               return customPartitioner;
+       }
+
+       /**
+        * Checks, if the properties in this object are trivial, i.e. only 
standard values.
+        */
+       public boolean isTrivial() {
+               return this.partitioning == null || this.partitioning == 
PartitioningProperty.RANDOM_PARTITIONED;
+       }
+
+       /**
+        * This method resets the properties to a state where no properties are 
given.
+        */
+       public void reset() {
+               this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+               this.ordering = null;
+               this.partitioningFields = null;
+               this.dataDistribution = null;
+               this.customPartitioner = null;
+       }
+
+       /**
+        * Filters these properties by what can be preserved by the given 
SemanticProperties when propagated down
+        * to the given input.
+        *
+        * @param props The SemanticProperties which define which fields are 
preserved.
+        * @param input The index of the operator's input.
+        * @return The filtered RequestedGlobalProperties
+        */
+       public RequestedGlobalProperties 
filterBySemanticProperties(SemanticProperties props, int input) {
+               // no semantic properties available. All global properties are 
filtered.
+               if (props == null) {
+                       throw new NullPointerException("SemanticProperties may 
not be null.");
+               }
+
+               RequestedGlobalProperties rgProp = new 
RequestedGlobalProperties();
+
+               switch(this.partitioning) {
+                       case FULL_REPLICATION:
+                       case FORCED_REBALANCED:
+                       case CUSTOM_PARTITIONING:
+                       case RANDOM_PARTITIONED:
+                       case ANY_DISTRIBUTION:
+                               // make sure that certain properties are not 
pushed down
+                               return null;
+                       case HASH_PARTITIONED:
+                       case ANY_PARTITIONING:
+                               FieldSet newFields;
+                               if(this.partitioningFields instanceof 
FieldList) {
+                                       newFields = new FieldList();
+                               } else {
+                                       newFields = new FieldSet();
+                               }
+
+                               for (Integer targetField : 
this.partitioningFields) {
+                                       int sourceField = 
props.getForwardingSourceField(input, targetField);
+                                       if (sourceField >= 0) {
+                                               newFields = 
newFields.addField(sourceField);
+                                       } else {
+                                               // partial partitionings are 
not preserved to avoid skewed partitioning
+                                               return null;
+                                       }
+                               }
+                               rgProp.partitioning = this.partitioning;
+                               rgProp.partitioningFields = newFields;
+                               return rgProp;
+                       case RANGE_PARTITIONED:
+                               // range partitioning
+                               Ordering newOrdering = new Ordering();
+                               for (int i = 0; i < 
this.ordering.getInvolvedIndexes().size(); i++) {
+                                       int value = 
this.ordering.getInvolvedIndexes().get(i);
+                                       int sourceField = 
props.getForwardingSourceField(input, value);
+                                       if (sourceField >= 0) {
+                                               
newOrdering.appendOrdering(sourceField, this.ordering.getType(i), 
this.ordering.getOrder(i));
+                                       } else {
+                                               return null;
+                                       }
+                               }
+                               rgProp.partitioning = this.partitioning;
+                               rgProp.ordering = newOrdering;
+                               rgProp.dataDistribution = this.dataDistribution;
+                               return rgProp;
+                       default:
+                               throw new RuntimeException("Unknown 
partitioning type encountered.");
+               }
+       }
+
+       /**
+        * Checks, if this set of interesting properties, is met by the given
+        * produced properties.
+        * 
+        * @param props The properties for which to check whether they meet 
these properties.
+        * @return True, if the properties are met, false otherwise.
+        */
+       public boolean isMetBy(GlobalProperties props) {
+               if (this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) 
{
+                       return true;
+               } else if (this.partitioning == 
PartitioningProperty.FULL_REPLICATION) {
+                       return props.isFullyReplicated();
+               }
+               else if (props.isFullyReplicated()) {
+                       return false;
+               }
+               else if (this.partitioning == 
PartitioningProperty.RANDOM_PARTITIONED) {
+                       return true;
+               }
+               else if (this.partitioning == 
PartitioningProperty.ANY_PARTITIONING) {
+                       return checkCompatiblePartitioningFields(props);
+               }
+               else if (this.partitioning == 
PartitioningProperty.HASH_PARTITIONED) {
+                       return props.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                                       
checkCompatiblePartitioningFields(props);
+               }
+               else if (this.partitioning == 
PartitioningProperty.RANGE_PARTITIONED) {
+                       return props.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
+                                       
props.matchesOrderedPartitioning(this.ordering);
+               }
+               else if (this.partitioning == 
PartitioningProperty.FORCED_REBALANCED) {
+                       return props.getPartitioning() == 
PartitioningProperty.FORCED_REBALANCED;
+               }
+               else if (this.partitioning == 
PartitioningProperty.CUSTOM_PARTITIONING) {
+                       return props.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING &&
+                                       
checkCompatiblePartitioningFields(props) &&
+                                       
props.getCustomPartitioner().equals(this.customPartitioner);
+
+               }
+               else {
+                       throw new CompilerException("Properties matching logic 
leaves open cases.");
+               }
+       }
+
+       /**
+        * Parametrizes the ship strategy fields of a channel such that the 
channel produces
+        * the desired global properties.
+        * 
+        * @param channel The channel to parametrize.
+        * @param globalDopChange Flag indicating whether the degree of 
parallelism changes
+        *                        between sender and receiver.
+        * @param exchangeMode The mode of data exchange (pipelined, always 
batch,
+        *                     batch only on shuffle, ...)
+        * @param breakPipeline Indicates whether this data exchange should 
break
+        *                      pipelines (unless pipelines are forced).
+        */
+       public void parameterizeChannel(Channel channel, boolean 
globalDopChange,
+                                                                       
ExecutionMode exchangeMode, boolean breakPipeline) {
+
+               // safety check. Fully replicated input must be preserved.
+               if 
(channel.getSource().getGlobalProperties().isFullyReplicated() &&
+                               !(this.partitioning == 
PartitioningProperty.FULL_REPLICATION ||
+                                       this.partitioning == 
PartitioningProperty.ANY_DISTRIBUTION))
+               {
+                       throw new CompilerException("Fully replicated input 
must be preserved " +
+                                       "and may not be converted into another 
global property.");
+               }
+
+               // if we request nothing, then we need no special strategy. 
forward, if the number of instances remains
+               // the same, randomly repartition otherwise
+               if (isTrivial() || this.partitioning == 
PartitioningProperty.ANY_DISTRIBUTION) {
+                       ShipStrategyType shipStrategy = globalDopChange ? 
ShipStrategyType.PARTITION_RANDOM :
+                                                                               
                                                ShipStrategyType.FORWARD;
+
+                       DataExchangeMode em = 
DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);
+                       channel.setShipStrategy(shipStrategy, em);
+                       return;
+               }
+               
+               final GlobalProperties inGlobals = 
channel.getSource().getGlobalProperties();
+               // if we have no global parallelism change, check if we have 
already compatible global properties
+               if (!globalDopChange && isMetBy(inGlobals)) {
+                       DataExchangeMode em = 
DataExchangeMode.select(exchangeMode, ShipStrategyType.FORWARD, breakPipeline);
+                       channel.setShipStrategy(ShipStrategyType.FORWARD, em);
+                       return;
+               }
+               
+               // if we fall through the conditions until here, we need to 
re-establish
+               ShipStrategyType shipType;
+               FieldList partitionKeys;
+               boolean[] sortDirection;
+               Partitioner<?> partitioner;
+
+               switch (this.partitioning) {
+                       case FULL_REPLICATION:
+                               shipType = ShipStrategyType.BROADCAST;
+                               partitionKeys = null;
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+
+                       case ANY_PARTITIONING:
+                       case HASH_PARTITIONED:
+                               shipType = ShipStrategyType.PARTITION_HASH;
+                               partitionKeys = 
Utils.createOrderedFromSet(this.partitioningFields);
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+                       
+                       case RANGE_PARTITIONED:
+                               shipType = ShipStrategyType.PARTITION_RANGE;
+                               partitionKeys = 
this.ordering.getInvolvedIndexes();
+                               sortDirection = 
this.ordering.getFieldSortDirections();
+                               partitioner = null;
+
+                               if (this.dataDistribution != null) {
+                                       
channel.setDataDistribution(this.dataDistribution);
+                               }
+                               break;
+
+                       case FORCED_REBALANCED:
+                               shipType = 
ShipStrategyType.PARTITION_FORCED_REBALANCE;
+                               partitionKeys = null;
+                               sortDirection = null;
+                               partitioner = null;
+                               break;
+
+                       case CUSTOM_PARTITIONING:
+                               shipType = ShipStrategyType.PARTITION_CUSTOM;
+                               partitionKeys = 
Utils.createOrderedFromSet(this.partitioningFields);
+                               sortDirection = null;
+                               partitioner = this.customPartitioner;
+                               break;
+
+                       default:
+                               throw new CompilerException("Invalid 
partitioning to create through a data exchange: "
+                                                                               
        + this.partitioning.name());
+               }
+
+               DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, 
shipType, breakPipeline);
+               channel.setShipStrategy(shipType, partitionKeys, sortDirection, 
partitioner, exMode);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = prime * result + ((partitioning == null) ? 0 : 
partitioning.ordinal());
+               result = prime * result + ((partitioningFields == null) ? 0 : 
partitioningFields.hashCode());
+               result = prime * result + ((ordering == null) ? 0 : 
ordering.hashCode());
+               return result;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj != null && obj instanceof RequestedGlobalProperties) {
+                       RequestedGlobalProperties other = 
(RequestedGlobalProperties) obj;
+                       return (ordering == other.getOrdering() || (ordering != 
null && ordering.equals(other.getOrdering())))
+                                       && (partitioning == 
other.getPartitioning())
+                                       && (partitioningFields == 
other.partitioningFields || 
+                                                       (partitioningFields != 
null && partitioningFields.equals(other.getPartitionedFields())));
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "Requested Global Properties [partitioning=" + 
partitioning + 
+                       (this.partitioningFields == null ? "" : ", on fields " 
+ this.partitioningFields) + 
+                       (this.ordering == null ? "" : ", with ordering " + 
this.ordering) + "]";
+       }
+
+       public RequestedGlobalProperties clone() {
+               try {
+                       return (RequestedGlobalProperties) super.clone();
+               } catch (CloneNotSupportedException cnse) {
+                       // should never happen, but propagate just in case
+                       throw new RuntimeException(cnse);
+               }
+       }
+
+       private boolean checkCompatiblePartitioningFields(GlobalProperties 
props) {
+               if(this.partitioningFields instanceof FieldList) {
+                       // partitioningFields as FieldList requires strict 
checking!
+                       return 
props.isExactlyPartitionedOnFields((FieldList)this.partitioningFields);
+               } else {
+                       return 
props.isPartitionedOnFields(this.partitioningFields);
+               }
+       }
+}

Reply via email to