http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
new file mode 100644
index 0000000..cbd58ca
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
+import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The Optimizer representation of a join operator.
+ */
+public class JoinNode extends TwoInputNode {
+       
+       private List<OperatorDescriptorDual> dataProperties;
+       
+       /**
+        * Creates a new JoinNode for the given join operator.
+        * 
+        * @param joinOperatorBase The join operator object.
+        */
+       public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
+               super(joinOperatorBase);
+               
+               this.dataProperties = getDataProperties(joinOperatorBase,
+                               joinOperatorBase.getJoinHint(), 
joinOperatorBase.getCustomPartitioner());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the contract object for this match node.
+        * 
+        * @return The contract.
+        */
+       @Override
+       public JoinOperatorBase<?, ?, ?, ?> getOperator() {
+               return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Join";
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return this.dataProperties;
+       }
+       
+       public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
+               OperatorDescriptorDual op;
+               if (solutionsetInputIndex == 0) {
+                       op = new HashJoinBuildFirstProperties(this.keys1, 
this.keys2);
+               } else if (solutionsetInputIndex == 1) {
+                       op = new HashJoinBuildSecondProperties(this.keys1, 
this.keys2);
+               } else {
+                       throw new IllegalArgumentException();
+               }
+               
+               this.dataProperties = Collections.singletonList(op);
+       }
+       
+       /**
+        * The default estimates build on the principle of inclusion: The 
smaller input key domain is included in the larger
+        * input key domain. We also assume that every key from the larger 
input has one join partner in the smaller input.
+        * The result cardinality is hence the larger one.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+               long card2 = 
getSecondPredecessorNode().getEstimatedNumRecords();
+               this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : 
Math.max(card1, card2);
+               
+               if (this.estimatedNumRecords >= 0) {
+                       float width1 = 
getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+                       float width2 = 
getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+                       float width = (width1 <= 0 || width2 <= 0) ? -1 : 
width1 + width2;
+                       
+                       if (width > 0) {
+                               this.estimatedOutputSize = (long) (width * 
this.estimatedNumRecords);
+                       }
+               }
+       }
+       
+       private List<OperatorDescriptorDual> 
getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint 
joinHint,
+                       Partitioner<?> customPartitioner)
+       {
+               // see if an internal hint dictates the strategy to use
+               Configuration conf = joinOperatorBase.getParameters();
+               String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
+
+               if (localStrategy != null) {
+                       final AbstractJoinDescriptor fixedDriverStrat;
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+                       {
+                               fixedDriverStrat = new 
SortMergeJoinDescriptor(this.keys1, this.keys2);
+                       }
+                       else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+                               fixedDriverStrat = new 
HashJoinBuildFirstProperties(this.keys1, this.keys2);
+                       }
+                       else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+                               fixedDriverStrat = new 
HashJoinBuildSecondProperties(this.keys1, this.keys2);
+                       }
+                       else {
+                               throw new CompilerException("Invalid local 
strategy hint for match contract: " + localStrategy);
+                       }
+                       
+                       if (customPartitioner != null) {
+                               
fixedDriverStrat.setCustomPartitioner(customPartitioner);
+                       }
+                       
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       list.add(fixedDriverStrat);
+                       return list;
+               }
+               else {
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       
+                       joinHint = joinHint == null ? 
JoinHint.OPTIMIZER_CHOOSES : joinHint;
+                       
+                       switch (joinHint) {
+                               case BROADCAST_HASH_FIRST:
+                                       list.add(new 
HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
+                                       break;
+                               case BROADCAST_HASH_SECOND:
+                                       list.add(new 
HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
+                                       break;
+                               case REPARTITION_HASH_FIRST:
+                                       list.add(new 
HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
+                                       break;
+                               case REPARTITION_HASH_SECOND:
+                                       list.add(new 
HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
+                                       break;
+                               case REPARTITION_SORT_MERGE:
+                                       list.add(new 
SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
+                                       break;
+                               case OPTIMIZER_CHOOSES:
+                                       list.add(new 
SortMergeJoinDescriptor(this.keys1, this.keys2));
+                                       list.add(new 
HashJoinBuildFirstProperties(this.keys1, this.keys2));
+                                       list.add(new 
HashJoinBuildSecondProperties(this.keys1, this.keys2));
+                                       break;
+                               default:
+                                       throw new 
CompilerException("Unrecognized join hint: " + joinHint);
+                       }
+                       
+                       if (customPartitioner != null) {
+                               for (OperatorDescriptorDual descr : list) {
+                                       ((AbstractJoinDescriptor) 
descr).setCustomPartitioner(customPartitioner);
+                               }
+                       }
+                       
+                       return list;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
new file mode 100644
index 0000000..35def59
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.MapDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>Map</i> operator node.
+ */
+public class MapNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       /**
+        * Creates a new MapNode for the given operator.
+        * 
+        * @param operator The map operator.
+        */
+       public MapNode(SingleInputOperator<?, ?, ?> operator) {
+               super(operator);
+               
+               this.possibleProperties = 
Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
+       }
+
+       @Override
+       public String getName() {
+               return "Map";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       /**
+        * Computes the estimates for the Map operator. 
+        * We assume that by default, Map takes one value and transforms it 
into another value.
+        * The cardinality consequently stays the same.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
new file mode 100644
index 0000000..b287c33
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.MapPartitionDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>MapPartition</i> operator 
node.
+ */
+public class MapPartitionNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       /**
+        * Creates a new MapNode for the given contract.
+        * 
+        * @param operator The map partition contract object.
+        */
+       public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
+               super(operator);
+               
+               this.possibleProperties = 
Collections.<OperatorDescriptorSingle>singletonList(new 
MapPartitionDescriptor());
+       }
+
+       @Override
+       public String getName() {
+               return "MapPartition";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       /**
+        * Computes the estimates for the MapPartition operator.
+        * We assume that by default, Map takes one value and transforms it 
into another value.
+        * The cardinality consequently stays the same.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // we really cannot make any estimates here
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
new file mode 100644
index 0000000..de3cd22
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
+import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The Optimizer representation of a join operator.
+ */
+public class MatchNode extends TwoInputNode {
+       
+       private List<OperatorDescriptorDual> dataProperties;
+       
+       /**
+        * Creates a new MatchNode for the given join operator.
+        * 
+        * @param joinOperatorBase The join operator object.
+        */
+       public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
+               super(joinOperatorBase);
+               this.dataProperties = getDataProperties(joinOperatorBase, 
joinOperatorBase.getJoinHint());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the contract object for this match node.
+        * 
+        * @return The contract.
+        */
+       @Override
+       public JoinOperatorBase<?, ?, ?, ?> getOperator() {
+               return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Join";
+       }
+
+       @Override
+       protected List<OperatorDescriptorDual> getPossibleProperties() {
+               return this.dataProperties;
+       }
+       
+       public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
+               OperatorDescriptorDual op;
+               if (solutionsetInputIndex == 0) {
+                       op = new HashJoinBuildFirstProperties(this.keys1, 
this.keys2);
+               } else if (solutionsetInputIndex == 1) {
+                       op = new HashJoinBuildSecondProperties(this.keys1, 
this.keys2);
+               } else {
+                       throw new IllegalArgumentException();
+               }
+               
+               this.dataProperties = Collections.singletonList(op);
+       }
+       
+       /**
+        * The default estimates build on the principle of inclusion: The 
smaller input key domain is included in the larger
+        * input key domain. We also assume that every key from the larger 
input has one join partner in the smaller input.
+        * The result cardinality is hence the larger one.
+        */
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+               long card2 = 
getSecondPredecessorNode().getEstimatedNumRecords();
+               this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : 
Math.max(card1, card2);
+               
+               if (this.estimatedNumRecords >= 0) {
+                       float width1 = 
getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+                       float width2 = 
getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+                       float width = (width1 <= 0 || width2 <= 0) ? -1 : 
width1 + width2;
+                       
+                       if (width > 0) {
+                               this.estimatedOutputSize = (long) (width * 
this.estimatedNumRecords);
+                       }
+               }
+       }
+       
+       private List<OperatorDescriptorDual> 
getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint 
joinHint) {
+               // see if an internal hint dictates the strategy to use
+               Configuration conf = joinOperatorBase.getParameters();
+               String localStrategy = 
conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
+
+               if (localStrategy != null) {
+                       final OperatorDescriptorDual fixedDriverStrat;
+                       if 
(Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+                               
Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+                       {
+                               fixedDriverStrat = new 
SortMergeJoinDescriptor(this.keys1, this.keys2);
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+                               fixedDriverStrat = new 
HashJoinBuildFirstProperties(this.keys1, this.keys2);
+                       } else if 
(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+                               fixedDriverStrat = new 
HashJoinBuildSecondProperties(this.keys1, this.keys2);
+                       } else {
+                               throw new CompilerException("Invalid local 
strategy hint for match contract: " + localStrategy);
+                       }
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       list.add(fixedDriverStrat);
+                       return list;
+               }
+               else {
+                       ArrayList<OperatorDescriptorDual> list = new 
ArrayList<OperatorDescriptorDual>();
+                       
+                       joinHint = joinHint == null ? 
JoinHint.OPTIMIZER_CHOOSES : joinHint;
+                       
+                       switch (joinHint) {
+                               case BROADCAST_HASH_FIRST:
+                                       list.add(new 
HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
+                                       break;
+                               case BROADCAST_HASH_SECOND:
+                                       list.add(new 
HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
+                                       break;
+                               case REPARTITION_HASH_FIRST:
+                                       list.add(new 
HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
+                                       break;
+                               case REPARTITION_HASH_SECOND:
+                                       list.add(new 
HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
+                                       break;
+                               case REPARTITION_SORT_MERGE:
+                                       list.add(new 
SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
+                                       break;
+                               case OPTIMIZER_CHOOSES:
+                                       list.add(new 
SortMergeJoinDescriptor(this.keys1, this.keys2));
+                                       list.add(new 
HashJoinBuildFirstProperties(this.keys1, this.keys2));
+                                       list.add(new 
HashJoinBuildSecondProperties(this.keys1, this.keys2));
+                                       break;
+                               default:
+                                       throw new 
CompilerException("Unrecognized join hint: " + joinHint);
+                       }
+                       
+                       return list;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
new file mode 100644
index 0000000..76467cf
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/NoOpNode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.NoOpDescriptor;
+
+/**
+ * The optimizer's internal representation of a <i>No Operation</i> node.
+ */
+public class NoOpNode extends UnaryOperatorNode {
+       
+       public NoOpNode() {
+               super("No Op", new FieldSet(), new NoOpDescriptor());
+       }
+       
+       public NoOpNode(String name) {
+               super(name, new FieldSet(), new NoOpDescriptor());
+       }
+       
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               this.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
new file mode 100644
index 0000000..0cad34e
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -0,0 +1,1172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.AbstractUdfOperator;
+import org.apache.flink.api.common.operators.CompilerHints;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.optimizer.plandump.DumpableNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitable;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The OptimizerNode is the base class of all nodes in the optimizer DAG. The 
optimizer DAG is the
+ * optimizer's representation of a program, created before the actual 
optimization (which creates different
+ * candidate plans and computes their cost).
+ * <p>>
+ * Nodes in the DAG correspond (almost) one-to-one to the operators in a 
program. The optimizer DAG is constructed
+ * to hold the additional information that the optimizer needs:
+ * <ul>
+ *     <li>Estimates of the data size processed by each operator</li>
+ *     <li>Helper structures to track where the data flow "splits" and 
"joins", to support flows that are
+ *         DAGs but not trees.</li>
+ *     <li>Tags and weights to differentiate between loop-variant and 
-invariant parts of an iteration</li>
+ *     <li>Interesting properties to be used during the enumeration of 
candidate plans</li>
+ * </ul>
+ */
+public abstract class OptimizerNode implements Visitable<OptimizerNode>, 
EstimateProvider, DumpableNode<OptimizerNode> {
+       
+       public static final int MAX_DYNAMIC_PATH_COST_WEIGHT = 100;
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                      Members
+       // 
--------------------------------------------------------------------------------------------
+
+       private final Operator<?> operator; // The operator (Reduce / Join / 
DataSource / ...)
+       
+       private List<String> broadcastConnectionNames = new 
ArrayList<String>(); // the broadcast inputs names of this node
+       
+       private List<DagConnection> broadcastConnections = new 
ArrayList<DagConnection>(); // the broadcast inputs of this node
+       
+       private List<DagConnection> outgoingConnections; // The links to 
succeeding nodes
+
+       private InterestingProperties intProps; // the interesting properties 
of this node
+       
+       // --------------------------------- Branch Handling 
------------------------------------------
+
+       protected List<UnclosedBranchDescriptor> openBranches; // stack of 
branches in the sub-graph that are not joined
+       
+       protected Set<OptimizerNode> closedBranchingNodes;      // stack of 
branching nodes which have already been closed
+       
+       protected List<OptimizerNode> hereJoinedBranches;       // the 
branching nodes (node with multiple outputs)
+       //                                                                      
        that are partially joined (through multiple inputs or broadcast vars)
+
+       // ---------------------------- Estimates and Annotations 
-------------------------------------
+       
+       protected long estimatedOutputSize = -1; // the estimated size of the 
output (bytes)
+
+       protected long estimatedNumRecords = -1; // the estimated number of 
key/value pairs in the output
+       
+       protected Set<FieldSet> uniqueFields; // set of attributes that will 
always be unique after this node
+
+       // --------------------------------- General Parameters 
---------------------------------------
+       
+       private int parallelism = -1; // the number of parallel instances of 
this node
+       
+       private long minimalMemoryPerSubTask = -1;
+
+       protected int id = -1;                          // the id for this node.
+       
+       protected int costWeight = 1;           // factor to weight the costs 
for dynamic paths
+       
+       protected boolean onDynamicPath;
+       
+       protected List<PlanNode> cachedPlans;   // cache candidates, because 
the may be accessed repeatedly
+
+       // 
------------------------------------------------------------------------
+       //                      Constructor / Setup
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new optimizer node that represents the given program 
operator.
+        * 
+        * @param op The operator that the node represents.
+        */
+       public OptimizerNode(Operator<?> op) {
+               this.operator = op;
+               readStubAnnotations();
+       }
+       
+       protected OptimizerNode(OptimizerNode toCopy) {
+               this.operator = toCopy.operator;
+               this.intProps = toCopy.intProps;
+               
+               this.openBranches = toCopy.openBranches;
+               this.closedBranchingNodes = toCopy.closedBranchingNodes;
+               
+               this.estimatedOutputSize = toCopy.estimatedOutputSize;
+               this.estimatedNumRecords = toCopy.estimatedNumRecords;
+               
+               this.parallelism = toCopy.parallelism;
+               this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask;
+               
+               this.id = toCopy.id;
+               this.costWeight = toCopy.costWeight;
+               this.onDynamicPath = toCopy.onDynamicPath;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Methods specific to unary- / binary- / special nodes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the name of this node, which is the name of the 
function/operator, or
+        * data source / data sink.
+        * 
+        * @return The node name.
+        */
+       public abstract String getName();
+
+       /**
+        * This function connects the predecessors to this operator.
+        *
+        * @param operatorToNode The map from program operators to optimizer 
nodes.
+        * @param defaultExchangeMode The data exchange mode to use, if the 
operator does not
+        *                            specify one.
+        */
+       public abstract void setInput(Map<Operator<?>, OptimizerNode> 
operatorToNode,
+                                                                       
ExecutionMode defaultExchangeMode);
+
+       /**
+        * This function connects the operators that produce the broadcast 
inputs to this operator.
+        *
+        * @param operatorToNode The map from program operators to optimizer 
nodes.
+        * @param defaultExchangeMode The data exchange mode to use, if the 
operator does not
+        *                            specify one.
+        *
+        * @throws CompilerException
+        */
+       public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> 
operatorToNode, ExecutionMode defaultExchangeMode) {
+               // skip for Operators that don't support broadcast variables 
+               if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) {
+                       return;
+               }
+
+               // get all broadcast inputs
+               AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, 
?>) getOperator());
+
+               // create connections and add them
+               for (Map.Entry<String, Operator<?>> input : 
operator.getBroadcastInputs().entrySet()) {
+                       OptimizerNode predecessor = 
operatorToNode.get(input.getValue());
+                       DagConnection connection = new 
DagConnection(predecessor, this,
+                                                                               
                                        ShipStrategyType.BROADCAST, 
defaultExchangeMode);
+                       addBroadcastConnection(input.getKey(), connection);
+                       predecessor.addOutgoingConnection(connection);
+               }
+       }
+
+       /**
+        * Gets all incoming connections of this node.
+        * This method needs to be overridden by subclasses to return the 
children.
+        * 
+        * @return The list of incoming connections.
+        */
+       public abstract List<DagConnection> getIncomingConnections();
+
+       /**
+        * Tells the node to compute the interesting properties for its inputs. 
The interesting properties
+        * for the node itself must have been computed before.
+        * The node must then see how many of interesting properties it 
preserves and add its own.
+        * 
+        * @param estimator The {@code CostEstimator} instance to use for plan 
cost estimation.
+        */
+       public abstract void 
computeInterestingPropertiesForInputs(CostEstimator estimator);
+
+       /**
+        * This method causes the node to compute the description of open 
branches in its sub-plan. An open branch
+        * describes, that a (transitive) child node had multiple outputs, 
which have not all been re-joined in the
+        * sub-plan. This method needs to set the <code>openBranches</code> 
field to a stack of unclosed branches, the
+        * latest one top. A branch is considered closed, if some later node 
sees all of the branching node's outputs,
+        * no matter if there have been more branches to different paths in the 
meantime.
+        */
+       public abstract void computeUnclosedBranchStack();
+       
+       
+       protected List<UnclosedBranchDescriptor> 
computeUnclosedBranchStackForBroadcastInputs(
+                                                                               
                                        List<UnclosedBranchDescriptor> 
branchesSoFar)
+       {
+               // handle the data flow branching for the broadcast inputs
+               for (DagConnection broadcastInput : getBroadcastConnections()) {
+                       OptimizerNode bcSource = broadcastInput.getSource();
+                       addClosedBranches(bcSource.closedBranchingNodes);
+                       
+                       List<UnclosedBranchDescriptor> bcBranches = 
bcSource.getBranchesForParent(broadcastInput);
+                       
+                       ArrayList<UnclosedBranchDescriptor> mergedBranches = 
new ArrayList<UnclosedBranchDescriptor>();
+                       mergeLists(branchesSoFar, bcBranches, mergedBranches, 
true);
+                       branchesSoFar = mergedBranches.isEmpty() ? 
Collections.<UnclosedBranchDescriptor>emptyList() : mergedBranches;
+               }
+               
+               return branchesSoFar;
+       }
+
+       /**
+        * Computes the plan alternatives for this node, an implicitly for all 
nodes that are children of
+        * this node. This method must determine for each alternative the 
global and local properties
+        * and the costs. This method may recursively call 
<code>getAlternatives()</code> on its children
+        * to get their plan alternatives, and build its own alternatives on 
top of those.
+        * 
+        * @param estimator
+        *        The cost estimator used to estimate the costs of each plan 
alternative.
+        * @return A list containing all plan alternatives.
+        */
+       public abstract List<PlanNode> getAlternativePlans(CostEstimator 
estimator);
+
+       /**
+        * This method implements the visit of a depth-first graph traversing 
visitor. Implementers must first
+        * call the <code>preVisit()</code> method, then hand the visitor to 
their children, and finally call
+        * the <code>postVisit()</code> method.
+        * 
+        * @param visitor
+        *        The graph traversing visitor.
+        * @see 
org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
+        */
+       @Override
+       public abstract void accept(Visitor<OptimizerNode> visitor);
+
+       public abstract SemanticProperties getSemanticProperties();
+
+       // 
------------------------------------------------------------------------
+       //                          Getters / Setters
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public Iterable<OptimizerNode> getPredecessors() {
+               List<OptimizerNode> allPredecessors = new 
ArrayList<OptimizerNode>();
+
+               for (DagConnection dagConnection : getIncomingConnections()) {
+                       allPredecessors.add(dagConnection.getSource());
+               }
+               
+               for (DagConnection conn : getBroadcastConnections()) {
+                       allPredecessors.add(conn.getSource());
+               }
+               
+               return allPredecessors;
+       }
+       
+       /**
+        * Gets the ID of this node. If the id has not yet been set, this 
method returns -1;
+        * 
+        * @return This node's id, or -1, if not yet set.
+        */
+       public int getId() {
+               return this.id;
+       }
+
+       /**
+        * Sets the ID of this node.
+        * 
+        * @param id
+        *        The id for this node.
+        */
+       public void initId(int id) {
+               if (id <= 0) {
+                       throw new IllegalArgumentException();
+               }
+               
+               if (this.id == -1) {
+                       this.id = id;
+               } else {
+                       throw new IllegalStateException("Id has already been 
initialized.");
+               }
+       }
+
+       /**
+        * Adds the broadcast connection identified by the given {@code name} 
to this node.
+        * 
+        * @param broadcastConnection The connection to add.
+        */
+       public void addBroadcastConnection(String name, DagConnection 
broadcastConnection) {
+               this.broadcastConnectionNames.add(name);
+               this.broadcastConnections.add(broadcastConnection);
+       }
+
+       /**
+        * Return the list of names associated with broadcast inputs for this 
node.
+        */
+       public List<String> getBroadcastConnectionNames() {
+               return this.broadcastConnectionNames;
+       }
+
+       /**
+        * Return the list of inputs associated with broadcast variables for 
this node.
+        */
+       public List<DagConnection> getBroadcastConnections() {
+               return this.broadcastConnections;
+       }
+
+       /**
+        * Adds a new outgoing connection to this node.
+        * 
+        * @param connection
+        *        The connection to add.
+        */
+       public void addOutgoingConnection(DagConnection connection) {
+               if (this.outgoingConnections == null) {
+                       this.outgoingConnections = new 
ArrayList<DagConnection>();
+               } else {
+                       if (this.outgoingConnections.size() == 64) {
+                               throw new CompilerException("Cannot currently 
handle nodes with more than 64 outputs.");
+                       }
+               }
+
+               this.outgoingConnections.add(connection);
+       }
+
+       /**
+        * The list of outgoing connections from this node to succeeding tasks.
+        * 
+        * @return The list of outgoing connections.
+        */
+       public List<DagConnection> getOutgoingConnections() {
+               return this.outgoingConnections;
+       }
+
+       /**
+        * Gets the operator represented by this optimizer node.
+        * 
+        * @return This node's operator.
+        */
+       public Operator<?> getOperator() {
+               return this.operator;
+       }
+
+       /**
+        * Gets the parallelism for the operator represented by this optimizer 
node.
+        * The parallelism denotes how many parallel instances of the operator 
on will be
+        * spawned during the execution. If this value is <code>-1</code>, then 
the system will take
+        * the default number of parallel instances.
+        * 
+        * @return The parallelism of the operator.
+        */
+       public int getParallelism() {
+               return this.parallelism;
+       }
+
+       /**
+        * Sets the parallelism for this optimizer node.
+        * The parallelism denotes how many parallel instances of the operator 
will be
+        * spawned during the execution. If this value is set to 
<code>-1</code>, then the system will take
+        * the default number of parallel instances.
+        * 
+        * @param parallelism The parallelism to set.
+        * @throws IllegalArgumentException If the parallelism is smaller than 
one and not -1.
+        */
+       public void setDegreeOfParallelism(int parallelism) {
+               if (parallelism < 1 && parallelism != -1) {
+                       throw new IllegalArgumentException("Degree of 
parallelism of " + parallelism + " is invalid.");
+               }
+               this.parallelism = parallelism;
+       }
+       
+       /**
+        * Gets the amount of memory that all subtasks of this task have 
jointly available.
+        * 
+        * @return The total amount of memory across all subtasks.
+        */
+       public long getMinimalMemoryAcrossAllSubTasks() {
+               return this.minimalMemoryPerSubTask == -1 ? -1 : 
this.minimalMemoryPerSubTask * this.parallelism;
+       }
+       
+       public boolean isOnDynamicPath() {
+               return this.onDynamicPath;
+       }
+       
+       public void identifyDynamicPath(int costWeight) {
+               boolean anyDynamic = false;
+               boolean allDynamic = true;
+               
+               for (DagConnection conn : getIncomingConnections()) {
+                       boolean dynamicIn = conn.isOnDynamicPath();
+                       anyDynamic |= dynamicIn;
+                       allDynamic &= dynamicIn;
+               }
+               
+               for (DagConnection conn : getBroadcastConnections()) {
+                       boolean dynamicIn = conn.isOnDynamicPath();
+                       anyDynamic |= dynamicIn;
+                       allDynamic &= dynamicIn;
+               }
+               
+               if (anyDynamic) {
+                       this.onDynamicPath = true;
+                       this.costWeight = costWeight;
+                       if (!allDynamic) {
+                               // this node joins static and dynamic path.
+                               // mark the connections where the source is not 
dynamic as cached
+                               for (DagConnection conn : 
getIncomingConnections()) {
+                                       if 
(!conn.getSource().isOnDynamicPath()) {
+                                               
conn.setMaterializationMode(conn.getMaterializationMode().makeCached());
+                                       }
+                               }
+                               
+                               // broadcast variables are always cached, 
because they stay unchanged available in the
+                               // runtime context of the functions
+                       }
+               }
+       }
+       
+       public int getCostWeight() {
+               return this.costWeight;
+       }
+       
+       public int getMaxDepth() {
+               int maxDepth = 0;
+               for (DagConnection conn : getIncomingConnections()) {
+                       maxDepth = Math.max(maxDepth, conn.getMaxDepth());
+               }
+               for (DagConnection conn : getBroadcastConnections()) {
+                       maxDepth = Math.max(maxDepth, conn.getMaxDepth());
+               }
+               
+               return maxDepth;
+       }
+
+       /**
+        * Gets the properties that are interesting for this node to produce.
+        * 
+        * @return The interesting properties for this node, or null, if not 
yet computed.
+        */
+       public InterestingProperties getInterestingProperties() {
+               return this.intProps;
+       }
+
+       @Override
+       public long getEstimatedOutputSize() {
+               return this.estimatedOutputSize;
+       }
+
+       @Override
+       public long getEstimatedNumRecords() {
+               return this.estimatedNumRecords;
+       }
+       
+       public void setEstimatedOutputSize(long estimatedOutputSize) {
+               this.estimatedOutputSize = estimatedOutputSize;
+       }
+
+       public void setEstimatedNumRecords(long estimatedNumRecords) {
+               this.estimatedNumRecords = estimatedNumRecords;
+       }
+       
+       @Override
+       public float getEstimatedAvgWidthPerOutputRecord() {
+               if (this.estimatedOutputSize > 0 && this.estimatedNumRecords > 
0) {
+                       return ((float) this.estimatedOutputSize) / 
this.estimatedNumRecords;
+               } else {
+                       return -1.0f;
+               }
+       }
+
+       /**
+        * Checks whether this node has branching output. A node's output is 
branched, if it has more
+        * than one output connection.
+        * 
+        * @return True, if the node's output branches. False otherwise.
+        */
+       public boolean isBranching() {
+               return getOutgoingConnections() != null && 
getOutgoingConnections().size() > 1;
+       }
+
+       public void markAllOutgoingConnectionsAsPipelineBreaking() {
+               if (this.outgoingConnections == null) {
+                       throw new IllegalStateException("The outgoing 
connections have not yet been initialized.");
+               }
+               for (DagConnection conn : getOutgoingConnections()) {
+                       conn.markBreaksPipeline();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //                              Miscellaneous
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Checks, if all outgoing connections have their interesting 
properties set from their target nodes.
+        * 
+        * @return True, if on all outgoing connections, the interesting 
properties are set. False otherwise.
+        */
+       public boolean haveAllOutputConnectionInterestingProperties() {
+               for (DagConnection conn : getOutgoingConnections()) {
+                       if (conn.getInterestingProperties() == null) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * Computes all the interesting properties that are relevant to this 
node. The interesting
+        * properties are a union of the interesting properties on each 
outgoing connection.
+        * However, if two interesting properties on the outgoing connections 
overlap,
+        * the interesting properties will occur only once in this set. For 
that, this
+        * method deduplicates and merges the interesting properties.
+        * This method returns copies of the original interesting properties 
objects and
+        * leaves the original objects, contained by the connections, unchanged.
+        */
+       public void computeUnionOfInterestingPropertiesFromSuccessors() {
+               List<DagConnection> conns = getOutgoingConnections();
+               if (conns.size() == 0) {
+                       // no incoming, we have none ourselves
+                       this.intProps = new InterestingProperties();
+               } else {
+                       this.intProps = 
conns.get(0).getInterestingProperties().clone();
+                       for (int i = 1; i < conns.size(); i++) {
+                               
this.intProps.addInterestingProperties(conns.get(i).getInterestingProperties());
+                       }
+               }
+               this.intProps.dropTrivials();
+       }
+       
+       public void clearInterestingProperties() {
+               this.intProps = null;
+               for (DagConnection conn : getIncomingConnections()) {
+                       conn.clearInterestingProperties();
+               }
+               for (DagConnection conn : getBroadcastConnections()) {
+                       conn.clearInterestingProperties();
+               }
+       }
+       
+       /**
+        * Causes this node to compute its output estimates (such as number of 
rows, size in bytes)
+        * based on the inputs and the compiler hints. The compiler hints are 
instantiated with conservative
+        * default values which are used if no other values are provided. Nodes 
may access the statistics to
+        * determine relevant information.
+        * 
+        * @param statistics
+        *        The statistics object which may be accessed to get 
statistical information.
+        *        The parameter may be null, if no statistics are available.
+        */
+       public void computeOutputEstimates(DataStatistics statistics) {
+               // sanity checking
+               for (DagConnection c : getIncomingConnections()) {
+                       if (c.getSource() == null) {
+                               throw new CompilerException("Bug: Estimate 
computation called before inputs have been set.");
+                       }
+               }
+               
+               // let every operator do its computation
+               computeOperatorSpecificDefaultEstimates(statistics);
+               
+               if (this.estimatedOutputSize < 0) {
+                       this.estimatedOutputSize = -1;
+               }
+               if (this.estimatedNumRecords < 0) {
+                       this.estimatedNumRecords = -1;
+               }
+               
+               // overwrite default estimates with hints, if given
+               if (getOperator() == null || getOperator().getCompilerHints() 
== null) {
+                       return ;
+               }
+               
+               CompilerHints hints = getOperator().getCompilerHints();
+               if (hints.getOutputSize() >= 0) {
+                       this.estimatedOutputSize = hints.getOutputSize();
+               }
+               
+               if (hints.getOutputCardinality() >= 0) {
+                       this.estimatedNumRecords = hints.getOutputCardinality();
+               }
+               
+               if (hints.getFilterFactor() >= 0.0f) {
+                       if (this.estimatedNumRecords >= 0) {
+                               this.estimatedNumRecords = (long) 
(this.estimatedNumRecords * hints.getFilterFactor());
+                               
+                               if (this.estimatedOutputSize >= 0) {
+                                       this.estimatedOutputSize = (long) 
(this.estimatedOutputSize * hints.getFilterFactor());
+                               }
+                       }
+                       else if (this instanceof SingleInputNode) {
+                               OptimizerNode pred = ((SingleInputNode) 
this).getPredecessorNode();
+                               if (pred != null && 
pred.getEstimatedNumRecords() >= 0) {
+                                       this.estimatedNumRecords = (long) 
(pred.getEstimatedNumRecords() * hints.getFilterFactor());
+                               }
+                       }
+               }
+               
+               // use the width to infer the cardinality (given size) and vice 
versa
+               if (hints.getAvgOutputRecordSize() >= 1) {
+                       // the estimated number of rows based on size
+                       if (this.estimatedNumRecords == -1 && 
this.estimatedOutputSize >= 0) {
+                               this.estimatedNumRecords = (long) 
(this.estimatedOutputSize / hints.getAvgOutputRecordSize());
+                       }
+                       else if (this.estimatedOutputSize == -1 && 
this.estimatedNumRecords >= 0) {
+                               this.estimatedOutputSize = (long) 
(this.estimatedNumRecords * hints.getAvgOutputRecordSize());
+                       }
+               }
+       }
+       
+       protected abstract void 
computeOperatorSpecificDefaultEstimates(DataStatistics statistics);
+       
+       // 
------------------------------------------------------------------------
+       // Reading of stub annotations
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Reads all stub annotations, i.e. which fields remain constant, what 
cardinality bounds the
+        * functions have, which fields remain unique.
+        */
+       protected void readStubAnnotations() {
+               readUniqueFieldsAnnotation();
+       }
+       
+       protected void readUniqueFieldsAnnotation() {
+               if (this.operator.getCompilerHints() != null) {
+                       Set<FieldSet> uniqueFieldSets = 
operator.getCompilerHints().getUniqueFields();
+                       if (uniqueFieldSets != null) {
+                               if (this.uniqueFields == null) {
+                                       this.uniqueFields = new 
HashSet<FieldSet>();
+                               }
+                               this.uniqueFields.addAll(uniqueFieldSets);
+                       }
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       // Access of stub annotations
+       // 
------------------------------------------------------------------------
+       
+       /**
+        * Gets the FieldSets which are unique in the output of the node. 
+        */
+       public Set<FieldSet> getUniqueFields() {
+               return this.uniqueFields == null ? 
Collections.<FieldSet>emptySet() : this.uniqueFields;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                                    Pruning
+       // 
--------------------------------------------------------------------------------------------
+       
+       protected void prunePlanAlternatives(List<PlanNode> plans) {
+               if (plans.isEmpty()) {
+                       throw new CompilerException("No plan meeting the 
requirements could be created @ " + this + ". Most likely reason: Too 
restrictive plan hints.");
+               }
+               // shortcut for the simple case
+               if (plans.size() == 1) {
+                       return;
+               }
+               
+               // we can only compare plan candidates that made equal choices
+               // at the branching points. for each choice at a branching 
point,
+               // we need to keep the cheapest (wrt. interesting properties).
+               // if we do not keep candidates for each branch choice, we 
might not
+               // find branch compatible candidates when joining the branches 
back.
+               
+               // for pruning, we are quasi AFTER the node, so in the presence 
of
+               // branches, we need form the per-branch-choice groups by the 
choice
+               // they made at the latest un-joined branching node. Note that 
this is
+               // different from the check for branch compatibility of 
candidates, as
+               // this happens on the input sub-plans and hence BEFORE the 
node (therefore
+               // it is relevant to find the latest (partially) joined branch 
point.
+               
+               if (this.openBranches == null || this.openBranches.isEmpty()) {
+                       prunePlanAlternativesWithCommonBranching(plans);
+               } else {
+                       // partition the candidates into groups that made the 
same sub-plan candidate
+                       // choice at the latest unclosed branch point
+                       
+                       final OptimizerNode[] branchDeterminers = new 
OptimizerNode[this.openBranches.size()];
+                       
+                       for (int i = 0; i < branchDeterminers.length; i++) {
+                               branchDeterminers[i] = 
this.openBranches.get(this.openBranches.size() - 1 - i).getBranchingNode();
+                       }
+                       
+                       // this sorter sorts by the candidate choice at the 
branch point
+                       Comparator<PlanNode> sorter = new 
Comparator<PlanNode>() {
+                               
+                               @Override
+                               public int compare(PlanNode o1, PlanNode o2) {
+                                       for (OptimizerNode branchDeterminer : 
branchDeterminers) {
+                                               PlanNode n1 = 
o1.getCandidateAtBranchPoint(branchDeterminer);
+                                               PlanNode n2 = 
o2.getCandidateAtBranchPoint(branchDeterminer);
+                                               int hash1 = 
System.identityHashCode(n1);
+                                               int hash2 = 
System.identityHashCode(n2);
+
+                                               if (hash1 != hash2) {
+                                                       return hash1 - hash2;
+                                               }
+                                       }
+                                       return 0;
+                               }
+                       };
+                       Collections.sort(plans, sorter);
+                       
+                       List<PlanNode> result = new ArrayList<PlanNode>();
+                       List<PlanNode> turn = new ArrayList<PlanNode>();
+                       
+                       final PlanNode[] determinerChoice = new 
PlanNode[branchDeterminers.length];
+
+                       while (!plans.isEmpty()) {
+                               // take one as the determiner
+                               turn.clear();
+                               PlanNode determiner = plans.remove(plans.size() 
- 1);
+                               turn.add(determiner);
+                               
+                               for (int i = 0; i < determinerChoice.length; 
i++) {
+                                       determinerChoice[i] = 
determiner.getCandidateAtBranchPoint(branchDeterminers[i]);
+                               }
+
+                               // go backwards through the plans and find all 
that are equal
+                               boolean stillEqual = true;
+                               for (int k = plans.size() - 1; k >= 0 && 
stillEqual; k--) {
+                                       PlanNode toCheck = plans.get(k);
+                                       
+                                       for (int i = 0; i < 
branchDeterminers.length; i++) {
+                                               PlanNode checkerChoice = 
toCheck.getCandidateAtBranchPoint(branchDeterminers[i]);
+                                       
+                                               if (checkerChoice != 
determinerChoice[i]) {
+                                                       // not the same anymore
+                                                       stillEqual = false;
+                                                       break;
+                                               }
+                                       }
+                                       
+                                       if (stillEqual) {
+                                               // the same
+                                               plans.remove(k);
+                                               turn.add(toCheck);
+                                       }
+                               }
+
+                               // now that we have only plans with the same 
branch alternatives, prune!
+                               if (turn.size() > 1) {
+                                       
prunePlanAlternativesWithCommonBranching(turn);
+                               }
+                               result.addAll(turn);
+                       }
+
+                       // after all turns are complete
+                       plans.clear();
+                       plans.addAll(result);
+               }
+       }
+       
+       protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> 
plans) {
+               // for each interesting property, which plans are cheapest
+               final RequestedGlobalProperties[] gps = 
this.intProps.getGlobalProperties().toArray(
+                                                       new 
RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
+               final RequestedLocalProperties[] lps = 
this.intProps.getLocalProperties().toArray(
+                                                       new 
RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
+               
+               final PlanNode[][] toKeep = new PlanNode[gps.length][];
+               final PlanNode[] cheapestForGlobal = new PlanNode[gps.length];
+               
+               
+               PlanNode cheapest = null; // the overall cheapest plan
+
+               // go over all plans from the list
+               for (PlanNode candidate : plans) {
+                       // check if that plan is the overall cheapest
+                       if (cheapest == null || 
(cheapest.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0)) {
+                               cheapest = candidate;
+                       }
+
+                       // find the interesting global properties that this 
plan matches
+                       for (int i = 0; i < gps.length; i++) {
+                               if 
(gps[i].isMetBy(candidate.getGlobalProperties())) {
+                                       // the candidate meets the global 
property requirements. That means
+                                       // it has a chance that its local 
properties are re-used (they would be
+                                       // destroyed if global properties need 
to be established)
+                                       
+                                       if (cheapestForGlobal[i] == null || 
(cheapestForGlobal[i].getCumulativeCosts().compareTo(candidate.getCumulativeCosts())
 > 0)) {
+                                               cheapestForGlobal[i] = 
candidate;
+                                       }
+                                       
+                                       final PlanNode[] localMatches;
+                                       if (toKeep[i] == null) {
+                                               localMatches = new 
PlanNode[lps.length];
+                                               toKeep[i] = localMatches;
+                                       } else {
+                                               localMatches = toKeep[i];
+                                       }
+                                       
+                                       for (int k = 0; k < lps.length; k++) {
+                                               if 
(lps[k].isMetBy(candidate.getLocalProperties())) {
+                                                       final PlanNode previous 
= localMatches[k];
+                                                       if (previous == null || 
previous.getCumulativeCosts().compareTo(candidate.getCumulativeCosts()) > 0) {
+                                                               // this one is 
cheaper!
+                                                               localMatches[k] 
= candidate;
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               // all plans are set now
+               plans.clear();
+
+               // add the cheapest plan
+               if (cheapest != null) {
+                       plans.add(cheapest);
+                       cheapest.setPruningMarker(); // remember that that plan 
is in the set
+               }
+
+               // add all others, which are optimal for some interesting 
properties
+               for (int i = 0; i < gps.length; i++) {
+                       if (toKeep[i] != null) {
+                               final PlanNode[] localMatches = toKeep[i];
+                               for (final PlanNode n : localMatches) {
+                                       if (n != null && !n.isPruneMarkerSet()) 
{
+                                               n.setPruningMarker();
+                                               plans.add(n);
+                                       }
+                               }
+                       }
+                       if (cheapestForGlobal[i] != null) {
+                               final PlanNode n = cheapestForGlobal[i];
+                               if (!n.isPruneMarkerSet()) {
+                                       n.setPruningMarker();
+                                       plans.add(n);
+                               }
+                       }
+               }
+       }
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       //                       Handling of branches
+       // 
--------------------------------------------------------------------------------------------
+
+       public boolean hasUnclosedBranches() {
+               return this.openBranches != null && 
!this.openBranches.isEmpty();
+       }
+
+       public Set<OptimizerNode> getClosedBranchingNodes() {
+               return this.closedBranchingNodes;
+       }
+       
+       public List<UnclosedBranchDescriptor> getOpenBranches() {
+               return this.openBranches;
+       }
+
+
+       protected List<UnclosedBranchDescriptor> 
getBranchesForParent(DagConnection toParent) {
+               if (this.outgoingConnections.size() == 1) {
+                       // return our own stack of open branches, because 
nothing is added
+                       if (this.openBranches == null || 
this.openBranches.isEmpty()) {
+                               return Collections.emptyList();
+                       } else {
+                               return new 
ArrayList<UnclosedBranchDescriptor>(this.openBranches);
+                       }
+               }
+               else if (this.outgoingConnections.size() > 1) {
+                       // we branch add a branch info to the stack
+                       List<UnclosedBranchDescriptor> branches = new 
ArrayList<UnclosedBranchDescriptor>(4);
+                       if (this.openBranches != null) {
+                               branches.addAll(this.openBranches);
+                       }
+
+                       // find out, which output number the connection to the 
parent
+                       int num;
+                       for (num = 0; num < this.outgoingConnections.size(); 
num++) {
+                               if (this.outgoingConnections.get(num) == 
toParent) {
+                                       break;
+                               }
+                       }
+                       if (num >= this.outgoingConnections.size()) {
+                               throw new CompilerException("Error in compiler: 
"
+                                       + "Parent to get branch info for is not 
contained in the outgoing connections.");
+                       }
+
+                       // create the description and add it
+                       long bitvector = 0x1L << num;
+                       branches.add(new UnclosedBranchDescriptor(this, 
bitvector));
+                       return branches;
+               }
+               else {
+                       throw new CompilerException(
+                               "Error in compiler: Cannot get branch info for 
successor in a node with no successors.");
+               }
+       }
+
+       
+       protected void removeClosedBranches(List<UnclosedBranchDescriptor> 
openList) {
+               if (openList == null || openList.isEmpty() || 
this.closedBranchingNodes == null || this.closedBranchingNodes.isEmpty()) {
+                       return;
+               }
+               
+               Iterator<UnclosedBranchDescriptor> it = openList.iterator();
+               while (it.hasNext()) {
+                       if 
(this.closedBranchingNodes.contains(it.next().getBranchingNode())) {
+                               //this branch was already closed --> remove it 
from the list
+                               it.remove();
+                       }
+               }
+       }
+       
+       protected void addClosedBranches(Set<OptimizerNode> alreadyClosed) {
+               if (alreadyClosed == null || alreadyClosed.isEmpty()) {
+                       return;
+               }
+               
+               if (this.closedBranchingNodes == null) { 
+                       this.closedBranchingNodes = new 
HashSet<OptimizerNode>(alreadyClosed);
+               } else {
+                       this.closedBranchingNodes.addAll(alreadyClosed);
+               }
+       }
+       
+       protected void addClosedBranch(OptimizerNode alreadyClosed) {
+               if (this.closedBranchingNodes == null) { 
+                       this.closedBranchingNodes = new 
HashSet<OptimizerNode>();
+               }
+
+               this.closedBranchingNodes.add(alreadyClosed);
+       }
+       
+       /**
+        * Checks whether to candidate plans for the sub-plan of this node are 
comparable. The two
+        * alternative plans are comparable, if
+        * 
+        * a) There is no branch in the sub-plan of this node
+        * b) Both candidates have the same candidate as the child at the last 
open branch. 
+        * 
+        * @param plan1 The root node of the first candidate plan.
+        * @param plan2 The root node of the second candidate plan.
+        * @return True if the nodes are branch compatible in the inputs.
+        */
+       protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) {
+               if (plan1 == null || plan2 == null) {
+                       throw new NullPointerException();
+               }
+               
+               // if there is no open branch, the children are always 
compatible.
+               // in most plans, that will be the dominant case
+               if (this.hereJoinedBranches == null || 
this.hereJoinedBranches.isEmpty()) {
+                       return true;
+               }
+
+               for (OptimizerNode joinedBrancher : hereJoinedBranches) {
+                       final PlanNode branch1Cand = 
plan1.getCandidateAtBranchPoint(joinedBrancher);
+                       final PlanNode branch2Cand = 
plan2.getCandidateAtBranchPoint(joinedBrancher);
+                       
+                       if (branch1Cand != null && branch2Cand != null && 
branch1Cand != branch2Cand) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+       
+       /**
+        * The node IDs are assigned in graph-traversal order (pre-order), 
hence, each list is
+        * sorted by ID in ascending order and all consecutive lists start with 
IDs in ascending order.
+        *
+        * @param markJoinedBranchesAsPipelineBreaking True, if the
+        */
+       protected final boolean mergeLists(List<UnclosedBranchDescriptor> 
child1open,
+                                                                               
List<UnclosedBranchDescriptor> child2open,
+                                                                               
List<UnclosedBranchDescriptor> result,
+                                                                               
boolean markJoinedBranchesAsPipelineBreaking) {
+
+               //remove branches which have already been closed
+               removeClosedBranches(child1open);
+               removeClosedBranches(child2open);
+               
+               result.clear();
+               
+               // check how many open branches we have. the cases:
+               // 1) if both are null or empty, the result is null
+               // 2) if one side is null (or empty), the result is the other 
side.
+               // 3) both are set, then we need to merge.
+               if (child1open == null || child1open.isEmpty()) {
+                       if(child2open != null && !child2open.isEmpty()) {
+                               result.addAll(child2open);
+                       }
+                       return false;
+               }
+               
+               if (child2open == null || child2open.isEmpty()) {
+                       result.addAll(child1open);
+                       return false;
+               }
+
+               int index1 = child1open.size() - 1;
+               int index2 = child2open.size() - 1;
+               
+               boolean didCloseABranch = false;
+
+               // as both lists (child1open and child2open) are sorted in 
ascending ID order
+               // we can do a merge-join-like loop which preserved the order 
in the result list
+               // and eliminates duplicates
+               while (index1 >= 0 || index2 >= 0) {
+                       int id1 = -1;
+                       int id2 = index2 >= 0 ? 
child2open.get(index2).getBranchingNode().getId() : -1;
+
+                       while (index1 >= 0 && (id1 = 
child1open.get(index1).getBranchingNode().getId()) > id2) {
+                               result.add(child1open.get(index1));
+                               index1--;
+                       }
+                       while (index2 >= 0 && (id2 = 
child2open.get(index2).getBranchingNode().getId()) > id1) {
+                               result.add(child2open.get(index2));
+                               index2--;
+                       }
+
+                       // match: they share a common branching child
+                       if (id1 == id2) {
+                               didCloseABranch = true;
+                               
+                               // if this is the latest common child, remember 
it
+                               OptimizerNode currBanchingNode = 
child1open.get(index1).getBranchingNode();
+                               
+                               long vector1 = 
child1open.get(index1).getJoinedPathsVector();
+                               long vector2 = 
child2open.get(index2).getJoinedPathsVector();
+                               
+                               // check if this is the same descriptor, 
(meaning that it contains the same paths)
+                               // if it is the same, add it only once, 
otherwise process the join of the paths
+                               if (vector1 == vector2) {
+                                       result.add(child1open.get(index1));
+                               }
+                               else {
+                                       // we merge (re-join) a branch
+
+                                       // mark the branch as a point where we 
break the pipeline
+                                       if 
(markJoinedBranchesAsPipelineBreaking) {
+                                               
currBanchingNode.markAllOutgoingConnectionsAsPipelineBreaking();
+                                       }
+
+                                       if (this.hereJoinedBranches == null) {
+                                               this.hereJoinedBranches = new 
ArrayList<OptimizerNode>(2);
+                                       }
+                                       
this.hereJoinedBranches.add(currBanchingNode);
+
+                                       // see, if this node closes the branch
+                                       long joinedInputs = vector1 | vector2;
+
+                                       // this is 2^size - 1, which is all 
bits set at positions 0..size-1
+                                       long allInputs = (0x1L << 
currBanchingNode.getOutgoingConnections().size()) - 1;
+
+                                       if (joinedInputs == allInputs) {
+                                               // closed - we can remove it 
from the stack
+                                               
addClosedBranch(currBanchingNode);
+                                       } else {
+                                               // not quite closed
+                                               result.add(new 
UnclosedBranchDescriptor(currBanchingNode, joinedInputs));
+                                       }
+                               }
+
+                               index1--;
+                               index2--;
+                       }
+               }
+
+               // merged. now we need to reverse the list, because we added 
the elements in reverse order
+               Collections.reverse(result);
+               return didCloseABranch;
+       }
+
+       @Override
+       public OptimizerNode getOptimizerNode() {
+               return this;
+       }
+       
+       @Override
+       public PlanNode getPlanNode() {
+               return null;
+       }
+       
+       @Override
+       public Iterable<DumpableConnection<OptimizerNode>> getDumpableInputs() {
+               List<DumpableConnection<OptimizerNode>> allInputs = new 
ArrayList<DumpableConnection<OptimizerNode>>();
+               
+               allInputs.addAll(getIncomingConnections());
+               allInputs.addAll(getBroadcastConnections());
+               
+               return allInputs;
+       }
+       
+       @Override
+       public String toString() {
+               StringBuilder bld = new StringBuilder();
+
+               bld.append(getName());
+               bld.append(" (").append(getOperator().getName()).append(") ");
+
+               int i = 1; 
+               for (DagConnection conn : getIncomingConnections()) {
+                       String shipStrategyName = conn.getShipStrategy() == 
null ? "null" : conn.getShipStrategy().name();
+                       
bld.append('(').append(i++).append(":").append(shipStrategyName).append(')');
+               }
+
+               return bld.toString();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Description of an unclosed branch. An unclosed branch is when the 
data flow branched (one operator's
+        * result is consumed by multiple targets), but these different 
branches (targets) have not been joined
+        * together.
+        */
+       public static final class UnclosedBranchDescriptor {
+
+               protected OptimizerNode branchingNode;
+
+               protected long joinedPathsVector;
+
+               /**
+                * Creates a new branching descriptor.
+                *
+                * @param branchingNode The node where the branch occurred (teh 
node with multiple outputs).
+                * @param joinedPathsVector A bit vector describing which 
branches are tracked by this descriptor.
+                *                          The bit vector is one, where the 
branch is tracked, zero otherwise.
+                */
+               protected UnclosedBranchDescriptor(OptimizerNode branchingNode, 
long joinedPathsVector) {
+                       this.branchingNode = branchingNode;
+                       this.joinedPathsVector = joinedPathsVector;
+               }
+
+               public OptimizerNode getBranchingNode() {
+                       return this.branchingNode;
+               }
+
+               public long getJoinedPathsVector() {
+                       return this.joinedPathsVector;
+               }
+
+               @Override
+               public String toString() {
+                       return "(" + this.branchingNode.getOperator() + ") [" + 
this.joinedPathsVector + "]";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
new file mode 100644
index 0000000..5c811b0
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * The optimizer's internal representation of a <i>Partition</i> operator node.
+ */
+public class PartitionNode extends SingleInputNode {
+
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       public PartitionNode(PartitionOperatorBase<?> operator) {
+               super(operator);
+               
+               OperatorDescriptorSingle descr = new PartitionDescriptor(
+                                       
this.getOperator().getPartitionMethod(), this.keys, 
operator.getCustomPartitioner());
+               this.possibleProperties = Collections.singletonList(descr);
+       }
+
+       @Override
+       public PartitionOperatorBase<?> getOperator() {
+               return (PartitionOperatorBase<?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Partition";
+       }
+
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // partitioning does not change the number of records
+               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               this.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+       }
+       
+       @Override
+       public SemanticProperties getSemanticProperties() {
+               return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       public static class PartitionDescriptor extends 
OperatorDescriptorSingle {
+
+               private final PartitionMethod pMethod;
+               private final Partitioner<?> customPartitioner;
+               
+               public PartitionDescriptor(PartitionMethod pMethod, FieldSet 
pKeys, Partitioner<?> customPartitioner) {
+                       super(pKeys);
+                       
+                       this.pMethod = pMethod;
+                       this.customPartitioner = customPartitioner;
+               }
+               
+               @Override
+               public DriverStrategy getStrategy() {
+                       return DriverStrategy.UNARY_NO_OP;
+               }
+
+               @Override
+               public SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node) {
+                       return new SingleInputPlanNode(node, "Partition", in, 
DriverStrategy.UNARY_NO_OP);
+               }
+
+               @Override
+               protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
+                       RequestedGlobalProperties rgps = new 
RequestedGlobalProperties();
+                       
+                       switch (this.pMethod) {
+                       case HASH:
+                               rgps.setHashPartitioned(this.keys);
+                               break;
+                       case REBALANCE:
+                               rgps.setForceRebalancing();
+                               break;
+                       case CUSTOM:
+                               rgps.setCustomPartitioned(this.keys, 
this.customPartitioner);
+                               break;
+                       case RANGE:
+                               throw new UnsupportedOperationException("Not 
yet supported");
+                       default:
+                               throw new IllegalArgumentException("Invalid 
partition method");
+                       }
+                       
+                       return Collections.singletonList(rgps);
+               }
+
+               @Override
+               protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
+                       // partitioning does not require any local property.
+                       return Collections.singletonList(new 
RequestedLocalProperties());
+               }
+               
+               @Override
+               public GlobalProperties 
computeGlobalProperties(GlobalProperties gProps) {
+                       // the partition node is a no-operation operation, such 
that all global properties are preserved.
+                       return gProps;
+               }
+               
+               @Override
+               public LocalProperties computeLocalProperties(LocalProperties 
lProps) {
+                       // the partition node is a no-operation operation, such 
that all global properties are preserved.
+                       return lProps;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
new file mode 100644
index 0000000..bbe4607
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PlanCacheCleaner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.util.Visitor;
+
+final class PlanCacheCleaner implements Visitor<OptimizerNode> {
+       
+       static final PlanCacheCleaner INSTANCE = new PlanCacheCleaner();
+
+       @Override
+       public boolean preVisit(OptimizerNode visitable) {
+               if (visitable.cachedPlans != null && 
visitable.isOnDynamicPath()) {
+                       visitable.cachedPlans = null;
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public void postVisit(OptimizerNode visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
new file mode 100644
index 0000000..1477038
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AllReduceProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.optimizer.operators.ReduceProperties;
+
+/**
+ * The Optimizer representation of a <i>Reduce</i> operator.
+ */
+public class ReduceNode extends SingleInputNode {
+       
+       private final List<OperatorDescriptorSingle> possibleProperties;
+       
+       private ReduceNode preReduceUtilityNode;
+       
+
+       public ReduceNode(ReduceOperatorBase<?, ?> operator) {
+               super(operator);
+               
+               if (this.keys == null) {
+                       // case of a key-less reducer. force a parallelism of 1
+                       setDegreeOfParallelism(1);
+               }
+               
+               OperatorDescriptorSingle props = this.keys == null ?
+                       new AllReduceProperties() :
+                       new ReduceProperties(this.keys, 
operator.getCustomPartitioner());
+               
+               this.possibleProperties = Collections.singletonList(props);
+       }
+       
+       public ReduceNode(ReduceNode reducerToCopyForCombiner) {
+               super(reducerToCopyForCombiner);
+               
+               this.possibleProperties = Collections.emptyList();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public ReduceOperatorBase<?, ?> getOperator() {
+               return (ReduceOperatorBase<?, ?>) super.getOperator();
+       }
+
+       @Override
+       public String getName() {
+               return "Reduce";
+       }
+       
+       @Override
+       protected List<OperatorDescriptorSingle> getPossibleProperties() {
+               return this.possibleProperties;
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimates
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
+               // no real estimates possible for a reducer.
+       }
+       
+       public ReduceNode getCombinerUtilityNode() {
+               if (this.preReduceUtilityNode == null) {
+                       this.preReduceUtilityNode = new ReduceNode(this);
+                       
+                       // we conservatively assume the combiner returns the 
same data size as it consumes 
+                       this.preReduceUtilityNode.estimatedOutputSize = 
getPredecessorNode().getEstimatedOutputSize();
+                       this.preReduceUtilityNode.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
+               }
+               return this.preReduceUtilityNode;
+       }
+}

Reply via email to