http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
deleted file mode 100644
index 7ae35c3..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class NoOpDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.UNARY_NO_OP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Pipe", in, 
DriverStrategy.UNARY_NO_OP);
-       }
-
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
-       }
-
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               return gProps;
-       }
-       
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
deleted file mode 100644
index c21593e..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-
-/**
- * 
- */
-public abstract class OperatorDescriptorDual implements 
AbstractOperatorDescriptor {
-       
-       protected final FieldList keys1;
-       protected final FieldList keys2;
-       
-       private List<GlobalPropertiesPair> globalProps;
-       private List<LocalPropertiesPair> localProps;
-       
-       protected OperatorDescriptorDual() {
-               this(null, null);
-       }
-       
-       protected OperatorDescriptorDual(FieldList keys1, FieldList keys2) {
-               this.keys1 = keys1;
-               this.keys2 = keys2;
-       }
-       
-       public List<GlobalPropertiesPair> getPossibleGlobalProperties() {
-               if (this.globalProps == null) {
-                       this.globalProps = createPossibleGlobalProperties();
-               }
-               
-               return this.globalProps;
-       }
-       
-       public List<LocalPropertiesPair> getPossibleLocalProperties() {
-               if (this.localProps == null) {
-                       this.localProps = createPossibleLocalProperties();
-               }
-               
-               return this.localProps;
-       }
-       
-       protected abstract List<GlobalPropertiesPair> 
createPossibleGlobalProperties();
-       
-       protected abstract List<LocalPropertiesPair> 
createPossibleLocalProperties();
-       
-       public abstract boolean areCompatible(RequestedGlobalProperties 
requested1, RequestedGlobalProperties requested2,
-                       GlobalProperties produced1, GlobalProperties produced2);
-       
-       public abstract boolean areCoFulfilled(RequestedLocalProperties 
requested1, RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2);
-       
-       public abstract DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node);
-       
-       public abstract GlobalProperties 
computeGlobalProperties(GlobalProperties in1, GlobalProperties in2);
-       
-       public abstract LocalProperties computeLocalProperties(LocalProperties 
in1, LocalProperties in2);
-
-       protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList 
fields1, FieldList fields2) {
-
-               // check number of produced partitioning fields
-               if(fields1.size() != fields2.size()) {
-                       return false;
-               } else {
-                       return 
checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size());
-               }
-       }
-
-       protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList 
fields1, FieldList fields2, int numRelevantFields) {
-
-               // check number of produced partitioning fields
-               if(fields1.size() < numRelevantFields || fields2.size() < 
numRelevantFields) {
-                       return false;
-               }
-               else {
-                       for(int i=0; i<numRelevantFields; i++) {
-                               int pField1 = fields1.get(i);
-                               int pField2 = fields2.get(i);
-                               // check if position of both produced fields is 
the same in both requested fields
-                               int j;
-                               for(j=0; j<this.keys1.size(); j++) {
-                                       if(this.keys1.get(j) == pField1 && 
this.keys2.get(j) == pField2) {
-                                               break;
-                                       }
-                                       else if(this.keys1.get(j) != pField1 && 
this.keys2.get(j) != pField2) {
-                                               // do nothing
-                                       }
-                                       else {
-                                               return false;
-                                       }
-                               }
-                               if(j == this.keys1.size()) {
-                                       throw new CompilerException("Fields 
were not found in key fields.");
-                               }
-                       }
-               }
-               return true;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class GlobalPropertiesPair {
-               
-               private final RequestedGlobalProperties props1, props2;
-
-               public GlobalPropertiesPair(RequestedGlobalProperties props1, 
RequestedGlobalProperties props2) {
-                       this.props1 = props1;
-                       this.props2 = props2;
-               }
-               
-               public RequestedGlobalProperties getProperties1() {
-                       return this.props1;
-               }
-               
-               public RequestedGlobalProperties getProperties2() {
-                       return this.props2;
-               }
-               
-               @Override
-               public int hashCode() {
-                       return (this.props1 == null ? 0 : 
this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == GlobalPropertiesPair.class) {
-                               final GlobalPropertiesPair other = 
(GlobalPropertiesPair) obj;
-                               
-                               return (this.props1 == null ? other.props1 == 
null : this.props1.equals(other.props1)) &&
-                                               (this.props2 == null ? 
other.props2 == null : this.props2.equals(other.props2));
-                       }
-                       return false;
-               }
-               
-               @Override
-               public String toString() {
-                       return "{" + this.props1 + " / " + this.props2 + "}";
-               }
-       }
-       
-       public static final class LocalPropertiesPair {
-               
-               private final RequestedLocalProperties props1, props2;
-
-               public LocalPropertiesPair(RequestedLocalProperties props1, 
RequestedLocalProperties props2) {
-                       this.props1 = props1;
-                       this.props2 = props2;
-               }
-               
-               public RequestedLocalProperties getProperties1() {
-                       return this.props1;
-               }
-               
-               public RequestedLocalProperties getProperties2() {
-                       return this.props2;
-               }
-               
-               @Override
-               public int hashCode() {
-                       return (this.props1 == null ? 0 : 
this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj.getClass() == LocalPropertiesPair.class) {
-                               final LocalPropertiesPair other = 
(LocalPropertiesPair) obj;
-                               
-                               return (this.props1 == null ? other.props1 == 
null : this.props1.equals(other.props1)) &&
-                                               (this.props2 == null ? 
other.props2 == null : this.props2.equals(other.props2));
-                       }
-                       return false;
-               }
-
-               @Override
-               public String toString() {
-                       return "{" + this.props1 + " / " + this.props2 + "}";
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
deleted file mode 100644
index c8be5d4..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-
-/**
- * Abstract base class for Operator descriptions which instantiates the node 
and sets the driver
- * strategy and the sorting and grouping keys. Returns possible local and 
global properties and
- * updates them after the operation has been performed.
- * @see org.apache.flink.compiler.dag.SingleInputNode
- */
-public abstract class OperatorDescriptorSingle implements 
AbstractOperatorDescriptor {
-       
-       protected final FieldSet keys;                  // the set of key fields
-       protected final FieldList keyList;              // the key fields with 
ordered field positions
-
-       private List<RequestedGlobalProperties> globalProps;
-       private List<RequestedLocalProperties> localProps;
-       
-       
-       protected OperatorDescriptorSingle() {
-               this(null);
-       }
-       
-       protected OperatorDescriptorSingle(FieldSet keys) {
-               this.keys = keys;
-               this.keyList = keys == null ? null : keys.toFieldList();
-       }
-
-
-       public List<RequestedGlobalProperties> getPossibleGlobalProperties() {
-               if (this.globalProps == null) {
-                       this.globalProps = createPossibleGlobalProperties();
-               }
-               return this.globalProps;
-       }
-       
-       public List<RequestedLocalProperties> getPossibleLocalProperties() {
-               if (this.localProps == null) {
-                       this.localProps = createPossibleLocalProperties();
-               }
-               return this.localProps;
-       }
-
-       /**
-        * Returns a list of global properties that are required by this 
operator descriptor.
-        * 
-        * @return A list of global properties that are required by this 
operator descriptor.
-        */
-       protected abstract List<RequestedGlobalProperties> 
createPossibleGlobalProperties();
-       
-       /**
-        * Returns a list of local properties that are required by this 
operator descriptor.
-        * 
-        * @return A list of local properties that are required by this 
operator descriptor.
-        */
-       protected abstract List<RequestedLocalProperties> 
createPossibleLocalProperties();
-       
-       public abstract SingleInputPlanNode instantiate(Channel in, 
SingleInputNode node);
-       
-       /**
-        * Returns the global properties which are present after the operator 
was applied on the 
-        * provided global properties.
-        * 
-        * @param in The global properties on which the operator is applied.
-        * @return The global properties which are valid after the operator has 
been applied.
-        */
-       public abstract GlobalProperties 
computeGlobalProperties(GlobalProperties in);
-       
-       /**
-        * Returns the local properties which are present after the operator 
was applied on the 
-        * provided local properties.
-        * 
-        * @param in The local properties on which the operator is applied.
-        * @return The local properties which are valid after the operator has 
been applied.
-        */
-       public abstract LocalProperties computeLocalProperties(LocalProperties 
in);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
deleted file mode 100644
index 2bde29b..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public final class PartialGroupProperties extends OperatorDescriptorSingle {
-       
-       public PartialGroupProperties(FieldSet keys) {
-               super(keys);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.SORTED_GROUP_COMBINE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               // create in input node for combine with same DOP as input node
-               GroupReduceNode combinerNode = new 
GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
-               
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
-               SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", 
in,
-                               DriverStrategy.SORTED_GROUP_COMBINE);
-               // sorting key info
-               combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
-               // set grouping comparator key info
-               combiner.setDriverKeyInfo(this.keyList, 1);
-               
-               return combiner;
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               RequestedLocalProperties props = new RequestedLocalProperties();
-               props.setGroupedFields(this.keys);
-               return Collections.singletonList(props);
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
deleted file mode 100644
index 5bb51f3..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-
-public final class ReduceProperties extends OperatorDescriptorSingle {
-       
-       private final Partitioner<?> customPartitioner;
-       
-       public ReduceProperties(FieldSet keys) {
-               this(keys, null);
-       }
-       
-       public ReduceProperties(FieldSet keys, Partitioner<?> 
customPartitioner) {
-               super(keys);
-               this.customPartitioner = customPartitioner;
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.SORTED_REDUCE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
-                               (node.getBroadcastConnections() != null && 
!node.getBroadcastConnections().isEmpty()))
-               {
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")", in,
-                                                                               
        DriverStrategy.SORTED_REDUCE, this.keyList);
-               }
-               else {
-                       // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
-                       Channel toCombiner = new Channel(in.getSource());
-                       toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       
-                       // create an input node for combine with same DOP as 
input node
-                       ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
-                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                                               "Combine 
("+node.getOperator().getName()+")", toCombiner,
-                                                               
DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
-
-                       combiner.setCosts(new Costs(0, 0));
-                       
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
-                       
-                       Channel toReducer = new Channel(combiner);
-                       toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-                                                                               
in.getShipStrategySortOrder(), in.getDataExchangeMode());
-                       toReducer.setLocalStrategy(LocalStrategy.SORT, 
in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-
-                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", toReducer,
-                                                                               
        DriverStrategy.SORTED_REDUCE, this.keyList);
-               }
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
-               if (customPartitioner == null) {
-                       props.setAnyPartitioning(this.keys);
-               } else {
-                       props.setCustomPartitioned(this.keys, 
this.customPartitioner);
-               }
-               return Collections.singletonList(props);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               RequestedLocalProperties props = new RequestedLocalProperties();
-               props.setGroupedFields(this.keys);
-               return Collections.singletonList(props);
-       }
-
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
deleted file mode 100644
index 1dcd87d..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class SolutionSetDeltaOperator extends OperatorDescriptorSingle {
-
-       public SolutionSetDeltaOperator(FieldList partitioningFields) {
-               super(partitioningFields);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.UNARY_NO_OP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "SolutionSet Delta", in, 
DriverStrategy.UNARY_NO_OP);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties partProps = new 
RequestedGlobalProperties();
-               partProps.setHashPartitioned(this.keyList);
-               return Collections.singletonList(partProps);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
deleted file mode 100644
index 356836a..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * 
- */
-public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
-       
-       public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
-               super(keys1, keys2);
-       }
-       
-       public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
-                       boolean broadcastFirstAllowed, boolean 
broadcastSecondAllowed, boolean repartitionAllowed)
-       {
-               super(keys1, keys2, broadcastFirstAllowed, 
broadcastSecondAllowed, repartitionAllowed);
-       }
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.MERGE;
-       }
-
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               RequestedLocalProperties sort1 = new 
RequestedLocalProperties(Utils.createOrdering(this.keys1));
-               RequestedLocalProperties sort2 = new 
RequestedLocalProperties(Utils.createOrdering(this.keys2));
-               return Collections.singletonList(new LocalPropertiesPair(sort1, 
sort2));
-       }
-
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2)
-       {
-               int numRelevantFields = this.keys1.size();
-               
-               Ordering prod1 = produced1.getOrdering();
-               Ordering prod2 = produced2.getOrdering();
-               
-               if (prod1 == null || prod2 == null) {
-                       throw new CompilerException("The given properties do 
not meet this operators requirements.");
-               }
-
-               // check that order of fields is equivalent
-               if (!checkEquivalentFieldPositionsInKeyFields(
-                               prod1.getInvolvedIndexes(), 
prod2.getInvolvedIndexes(), numRelevantFields)) {
-                       return false;
-               }
-
-               // check that both inputs have the same directions of order
-               for (int i = 0; i < numRelevantFields; i++) {
-                       if (prod1.getOrder(i) != prod2.getOrder(i)) {
-                               return false;
-                       }
-               }
-               return true;
-       }
-       
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               boolean[] inputOrders = 
in1.getLocalProperties().getOrdering().getFieldSortDirections();
-               
-               if (inputOrders == null || inputOrders.length < 
this.keys1.size()) {
-                       throw new CompilerException("BUG: The input strategy 
does not sufficiently describe the sort orders for a merge operator.");
-               } else if (inputOrders.length > this.keys1.size()) {
-                       boolean[] tmp = new boolean[this.keys1.size()];
-                       System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
-                       inputOrders = tmp;
-               }
-               
-               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, 
this.keys1, this.keys2, inputOrders);
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               LocalProperties comb = LocalProperties.combine(in1, in2);
-               return comb.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
deleted file mode 100644
index c42cff2..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual {
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.BINARY_NO_OP;
-       }
-       
-       @Override
-       protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
-               // all properties are possible
-               return Collections.singletonList(new GlobalPropertiesPair(
-                       new RequestedGlobalProperties(), new 
RequestedGlobalProperties()));
-       }
-
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               // all properties are possible
-               return Collections.singletonList(new LocalPropertiesPair(
-                       new RequestedLocalProperties(), new 
RequestedLocalProperties()));
-       }
-       
-       @Override
-       public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
-                       GlobalProperties produced1, GlobalProperties produced2) 
{
-               return true;
-       }
-       
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2) {
-               return true;
-       }
-
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               if (node instanceof SinkJoiner) {
-                       return new SinkJoinerPlanNode((SinkJoiner) node, in1, 
in2);
-               } else {
-                       throw new CompilerException();
-               }
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return new LocalProperties();
-       }
-
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
-               return GlobalProperties.combine(in1, in2);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
deleted file mode 100644
index bf22fb3..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * A special subclass for the union to make it identifiable.
- */
-public class BinaryUnionPlanNode extends DualInputPlanNode {
-       
-       /**
-        * @param template
-        */
-       public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, 
Channel in2) {
-               super(template, "Union", in1, in2, DriverStrategy.UNION);
-       }
-       
-       public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
-               super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", 
toSwapFrom.getInput2(), toSwapFrom.getInput1(),
-                               DriverStrategy.UNION_WITH_CACHED);
-               
-               this.globalProps = toSwapFrom.globalProps;
-               this.localProps = toSwapFrom.localProps;
-               this.nodeCosts = toSwapFrom.nodeCosts;
-               this.cumulativeCosts = toSwapFrom.cumulativeCosts;
-               
-               setParallelism(toSwapFrom.getParallelism());
-       }
-       
-       public BinaryUnionNode getOptimizerNode() {
-               return (BinaryUnionNode) this.template;
-       }
-       
-       public boolean unionsStaticAndDynamicPath() {
-               return getInput1().isOnDynamicPath() != 
getInput2().isOnDynamicPath();
-       }
-       
-       @Override
-       public int getMemoryConsumerWeight() {
-               return 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
deleted file mode 100644
index e79e2f3..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-
-import java.util.HashMap;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-public class BulkIterationPlanNode extends SingleInputPlanNode implements 
IterationPlanNode {
-       
-       private final BulkPartialSolutionPlanNode partialSolutionPlanNode;
-       
-       private final PlanNode rootOfStepFunction;
-       
-       private PlanNode rootOfTerminationCriterion;
-       
-       private TypeSerializerFactory<?> serializerForIterationChannel;
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       public BulkIterationPlanNode(BulkIterationNode template, String 
nodeName, Channel input,
-                       BulkPartialSolutionPlanNode pspn, PlanNode 
rootOfStepFunction)
-       {
-               super(template, nodeName, input, DriverStrategy.NONE);
-               this.partialSolutionPlanNode = pspn;
-               this.rootOfStepFunction = rootOfStepFunction;
-
-               mergeBranchPlanMaps();
-       }
-       
-       public BulkIterationPlanNode(BulkIterationNode template, String 
nodeName, Channel input,
-                       BulkPartialSolutionPlanNode pspn, PlanNode 
rootOfStepFunction, PlanNode rootOfTerminationCriterion)
-       {
-               this(template, nodeName, input, pspn, rootOfStepFunction);
-               this.rootOfTerminationCriterion = rootOfTerminationCriterion;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public BulkIterationNode getIterationNode() {
-               if (this.template instanceof BulkIterationNode) {
-                       return (BulkIterationNode) this.template;
-               } else {
-                       throw new RuntimeException();
-               }
-       }
-       
-       public BulkPartialSolutionPlanNode getPartialSolutionPlanNode() {
-               return this.partialSolutionPlanNode;
-       }
-       
-       public PlanNode getRootOfStepFunction() {
-               return this.rootOfStepFunction;
-       }
-       
-       public PlanNode getRootOfTerminationCriterion() {
-               return this.rootOfTerminationCriterion;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       
-       public TypeSerializerFactory<?> getSerializerForIterationChannel() {
-               return serializerForIterationChannel;
-       }
-       
-       public void setSerializerForIterationChannel(TypeSerializerFactory<?> 
serializerForIterationChannel) {
-               this.serializerForIterationChannel = 
serializerForIterationChannel;
-       }
-
-       public void setCosts(Costs nodeCosts) {
-               // add the costs from the step function
-               
nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts());
-               
-               // add the costs for the termination criterion, if it exists
-               // the costs are divided at branches, so we can simply add them 
up
-               if (rootOfTerminationCriterion != null) {
-                       
nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts());
-               }
-               
-               super.setCosts(nodeCosts);
-       }
-       
-       public int getMemoryConsumerWeight() {
-               return 1;
-       }
-       
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               }
-               
-               SourceAndDamReport fromOutside = 
super.hasDamOnPathDownTo(source);
-
-               if (fromOutside == FOUND_SOURCE_AND_DAM) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               else if (fromOutside == FOUND_SOURCE) {
-                       // we always have a dam in the back channel
-                       return FOUND_SOURCE_AND_DAM;
-               } else {
-                       // check the step function for dams
-                       return 
this.rootOfStepFunction.hasDamOnPathDownTo(source);
-               }
-       }
-
-       @Override
-       public void acceptForStepFunction(Visitor<PlanNode> visitor) {
-               this.rootOfStepFunction.accept(visitor);
-               
-               if(this.rootOfTerminationCriterion != null) {
-                       this.rootOfTerminationCriterion.accept(visitor);
-               }
-       }
-
-       private void mergeBranchPlanMaps() {
-               for (OptimizerNode.UnclosedBranchDescriptor desc: 
template.getOpenBranches()) {
-                       OptimizerNode brancher = desc.getBranchingNode();
-
-                       if (branchPlan == null) {
-                               branchPlan = new HashMap<OptimizerNode, 
PlanNode>(6);
-                       }
-                       
-                       if (!branchPlan.containsKey(brancher)) {
-                               PlanNode selectedCandidate = null;
-
-                               if (rootOfStepFunction.branchPlan != null) {
-                                       selectedCandidate = 
rootOfStepFunction.branchPlan.get(brancher);
-                               }
-
-                               if (selectedCandidate == null) {
-                                       throw new CompilerException(
-                                                       "Candidates for a node 
with open branches are missing information about the selected candidate ");
-                               }
-
-                               this.branchPlan.put(brancher, 
selectedCandidate);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
deleted file mode 100644
index df05b64..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for partial solution of a bulk iteration.
- */
-public class BulkPartialSolutionPlanNode extends PlanNode {
-       
-       private static final Costs NO_COSTS = new Costs();
-       
-       private BulkIterationPlanNode containingIterationNode;
-       
-       private Channel initialInput;
-       
-       public Object postPassHelper;
-       
-       
-       public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, 
String nodeName,
-                       GlobalProperties gProps, LocalProperties lProps,
-                       Channel initialInput)
-       {
-               super(template, nodeName, DriverStrategy.NONE);
-               
-               this.globalProps = gProps;
-               this.localProps = lProps;
-               this.initialInput = initialInput;
-               
-               // the partial solution does not cost anything
-               this.nodeCosts = NO_COSTS;
-               this.cumulativeCosts = NO_COSTS;
-               
-               if (initialInput.getSource().branchPlan != null && 
initialInput.getSource().branchPlan.size() > 0) {
-                       if (this.branchPlan == null) {
-                               this.branchPlan = new HashMap<OptimizerNode, 
PlanNode>();
-                       }
-                       
-                       
this.branchPlan.putAll(initialInput.getSource().branchPlan);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public BulkPartialSolutionNode getPartialSolutionNode() {
-               return (BulkPartialSolutionNode) this.template;
-       }
-       
-       public BulkIterationPlanNode getContainingIterationNode() {
-               return this.containingIterationNode;
-       }
-       
-       public void setContainingIterationNode(BulkIterationPlanNode 
containingIterationNode) {
-               this.containingIterationNode = containingIterationNode;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       visitor.postVisit(this);
-               }
-       }
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               return Collections.<PlanNode>emptyList();
-       }
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Collections.<Channel>emptyList();
-       }
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               }
-               SourceAndDamReport res = 
this.initialInput.getSource().hasDamOnPathDownTo(source);
-               if (res == FOUND_SOURCE_AND_DAM) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               else if (res == FOUND_SOURCE) {
-                       return (this.initialInput.getLocalStrategy().dams() || 
-                                       
this.initialInput.getTempMode().breaksPipeline() ||
-                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) ?
-                               FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
-               }
-               else {
-                       return NOT_FOUND;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
deleted file mode 100644
index 875d1c3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.EstimateProvider;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-
-/**
- * A Channel represents the result produced by an operator and the data 
exchange
- * before the consumption by the target operator.
- *
- * The channel defines and tracks various properties and characteristics of the
- * data set and data exchange.
- *
- * Data set characteristics:
- * <ul>
- *     <li>The "global properties" of the data, i.e., how the data is 
distributed across
- *         partitions</li>
- *     <li>The "required global properties" of the data, i.e., the global 
properties that, if absent,
- *         would cause the program to return a wrong result.</li>
- *     <li>The "local properties" of the data, i.e., how the data is organized 
within a partition</li>
- *     <li>The "required local properties" of the data, i.e., the local 
properties that, if absent,
- *         would cause the program to return a wrong result.</li>
- * </ul>
- *
- * Data exchange parameters:
- * <ul>
- *     <li>The "ship strategy", i.e., whether to forward the data, shuffle it, 
broadcast it, ...</li>
- *     <li>The "ship keys", which are the positions of the key fields in the 
exchanged records.</li>
- *     <li>The "data exchange mode", which defines whether to pipeline or 
batch the exchange</li>
- *     <li>Several more...</li>
- * </ul>
- */
-public class Channel implements EstimateProvider, Cloneable, 
DumpableConnection<PlanNode> {
-       
-       private PlanNode source;
-       
-       private PlanNode target;
-
-       private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
-
-       private DataExchangeMode dataExchangeMode;
-       
-       private LocalStrategy localStrategy = LocalStrategy.NONE;
-       
-       private FieldList shipKeys;
-       
-       private FieldList localKeys;
-       
-       private boolean[] shipSortOrder;
-       
-       private boolean[] localSortOrder;
-       
-       private RequestedGlobalProperties requiredGlobalProps;
-       
-       private RequestedLocalProperties requiredLocalProps;
-       
-       private GlobalProperties globalProps;
-       
-       private LocalProperties localProps;
-       
-       private TypeSerializerFactory<?> serializer;
-       
-       private TypeComparatorFactory<?> shipStrategyComparator;
-       
-       private TypeComparatorFactory<?> localStrategyComparator;
-       
-       private DataDistribution dataDistribution;
-       
-       private Partitioner<?> partitioner;
-       
-       private TempMode tempMode;
-       
-       private double relativeTempMemory;
-       
-       private double relativeMemoryLocalStrategy;
-       
-       private int replicationFactor = 1;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public Channel(PlanNode sourceNode) {
-               this(sourceNode, null);
-       }
-       
-       public Channel(PlanNode sourceNode, TempMode tempMode) {
-               this.source = sourceNode;
-               this.tempMode = (tempMode == null ? TempMode.NONE : tempMode);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                         Accessors
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Gets the source of this Channel.
-        *
-        * @return The source.
-        */
-       @Override
-       public PlanNode getSource() {
-               return this.source;
-       }
-       
-       /**
-        * Sets the target of this Channel.
-        *
-        * @param target The target.
-        */
-       public void setTarget(PlanNode target) {
-               this.target = target;
-       }
-       
-       /**
-        * Gets the target of this Channel.
-        *
-        * @return The target.
-        */
-       public PlanNode getTarget() {
-               return this.target;
-       }
-
-       public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode 
dataExchangeMode) {
-               setShipStrategy(strategy, null, null, null, dataExchangeMode);
-       }
-       
-       public void setShipStrategy(ShipStrategyType strategy, FieldList keys, 
DataExchangeMode dataExchangeMode) {
-               setShipStrategy(strategy, keys, null, null, dataExchangeMode);
-       }
-       
-       public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
-                                                               boolean[] 
sortDirection, DataExchangeMode dataExchangeMode) {
-               setShipStrategy(strategy, keys, sortDirection, null, 
dataExchangeMode);
-       }
-       
-       public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
-                                                               Partitioner<?> 
partitioner, DataExchangeMode dataExchangeMode) {
-               setShipStrategy(strategy, keys, null, partitioner, 
dataExchangeMode);
-       }
-       
-       public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
-                                                               boolean[] 
sortDirection, Partitioner<?> partitioner,
-                                                               
DataExchangeMode dataExchangeMode) {
-               this.shipStrategy = strategy;
-               this.shipKeys = keys;
-               this.shipSortOrder = sortDirection;
-               this.partitioner = partitioner;
-               this.dataExchangeMode = dataExchangeMode;
-               this.globalProps = null;                // reset the global 
properties
-       }
-
-       /**
-        * Gets the data exchange mode (batch / streaming) to use for the data
-        * exchange of this channel.
-        *
-        * @return The data exchange mode of this channel.
-        */
-       public DataExchangeMode getDataExchangeMode() {
-               return dataExchangeMode;
-       }
-
-       public ShipStrategyType getShipStrategy() {
-               return this.shipStrategy;
-       }
-       
-       public FieldList getShipStrategyKeys() {
-               return this.shipKeys;
-       }
-       
-       public boolean[] getShipStrategySortOrder() {
-               return this.shipSortOrder;
-       }
-       
-       public void setLocalStrategy(LocalStrategy strategy) {
-               setLocalStrategy(strategy, null, null);
-       }
-       
-       public void setLocalStrategy(LocalStrategy strategy, FieldList keys, 
boolean[] sortDirection) {
-               this.localStrategy = strategy;
-               this.localKeys = keys;
-               this.localSortOrder = sortDirection;
-               this.localProps = null;         // reset the local properties
-       }
-       
-       public LocalStrategy getLocalStrategy() {
-               return this.localStrategy;
-       }
-       
-       public FieldList getLocalStrategyKeys() {
-               return this.localKeys;
-       }
-       
-       public boolean[] getLocalStrategySortOrder() {
-               return this.localSortOrder;
-       }
-       
-       public void setDataDistribution(DataDistribution dataDistribution) {
-               this.dataDistribution = dataDistribution;
-       }
-       
-       public DataDistribution getDataDistribution() {
-               return this.dataDistribution;
-       }
-       
-       public Partitioner<?> getPartitioner() {
-               return partitioner;
-       }
-       
-       public TempMode getTempMode() {
-               return this.tempMode;
-       }
-
-       /**
-        * Sets the temp mode of the connection.
-        * 
-        * @param tempMode
-        *        The temp mode of the connection.
-        */
-       public void setTempMode(TempMode tempMode) {
-               this.tempMode = tempMode;
-       }
-       
-       /**
-        * Gets the memory for materializing the channel's result from this 
Channel.
-        *
-        * @return The temp memory.
-        */
-       public double getRelativeTempMemory() {
-               return this.relativeTempMemory;
-       }
-       
-       /**
-        * Sets the memory for materializing the channel's result from this 
Channel.
-        *
-        * @param relativeTempMemory The memory for materialization.
-        */
-       public void setRelativeTempMemory(double relativeTempMemory) {
-               this.relativeTempMemory = relativeTempMemory;
-       }
-       
-       /**
-        * Sets the replication factor of the connection.
-        * 
-        * @param factor The replication factor of the connection.
-        */
-       public void setReplicationFactor(int factor) {
-               this.replicationFactor = factor;
-       }
-       
-       /**
-        * Returns the replication factor of the connection.
-        * 
-        * @return The replication factor of the connection.
-        */
-       public int getReplicationFactor() {
-               return this.replicationFactor;
-       }
-       
-       /**
-        * Gets the serializer from this Channel.
-        *
-        * @return The serializer.
-        */
-       public TypeSerializerFactory<?> getSerializer() {
-               return serializer;
-       }
-       
-       /**
-        * Sets the serializer for this Channel.
-        *
-        * @param serializer The serializer to set.
-        */
-       public void setSerializer(TypeSerializerFactory<?> serializer) {
-               this.serializer = serializer;
-       }
-       
-       /**
-        * Gets the ship strategy comparator from this Channel.
-        *
-        * @return The ship strategy comparator.
-        */
-       public TypeComparatorFactory<?> getShipStrategyComparator() {
-               return shipStrategyComparator;
-       }
-       
-       /**
-        * Sets the ship strategy comparator for this Channel.
-        *
-        * @param shipStrategyComparator The ship strategy comparator to set.
-        */
-       public void setShipStrategyComparator(TypeComparatorFactory<?> 
shipStrategyComparator) {
-               this.shipStrategyComparator = shipStrategyComparator;
-       }
-       
-       /**
-        * Gets the local strategy comparator from this Channel.
-        *
-        * @return The local strategy comparator.
-        */
-       public TypeComparatorFactory<?> getLocalStrategyComparator() {
-               return localStrategyComparator;
-       }
-       
-       /**
-        * Sets the local strategy comparator for this Channel.
-        *
-        * @param localStrategyComparator The local strategy comparator to set.
-        */
-       public void setLocalStrategyComparator(TypeComparatorFactory<?> 
localStrategyComparator) {
-               this.localStrategyComparator = localStrategyComparator;
-       }
-       
-       public double getRelativeMemoryLocalStrategy() {
-               return relativeMemoryLocalStrategy;
-       }
-       
-       public void setRelativeMemoryLocalStrategy(double 
relativeMemoryLocalStrategy) {
-               this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
-       }
-       
-       public boolean isOnDynamicPath() {
-               return this.source.isOnDynamicPath();
-       }
-       
-       public int getCostWeight() {
-               return this.source.getCostWeight();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                Statistic Estimates
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public long getEstimatedOutputSize() {
-               long estimate = this.source.template.getEstimatedOutputSize();
-               return estimate < 0 ? estimate : estimate * 
this.replicationFactor;
-       }
-
-       @Override
-       public long getEstimatedNumRecords() {
-               long estimate =  this.source.template.getEstimatedNumRecords();
-               return estimate < 0 ? estimate : estimate * 
this.replicationFactor;
-       }
-       
-       @Override
-       public float getEstimatedAvgWidthPerOutputRecord() {
-               return 
this.source.template.getEstimatedAvgWidthPerOutputRecord();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                Data Property Handling
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       public RequestedGlobalProperties getRequiredGlobalProps() {
-               return requiredGlobalProps;
-       }
-
-       public void setRequiredGlobalProps(RequestedGlobalProperties 
requiredGlobalProps) {
-               this.requiredGlobalProps = requiredGlobalProps;
-       }
-
-       public RequestedLocalProperties getRequiredLocalProps() {
-               return requiredLocalProps;
-       }
-
-       public void setRequiredLocalProps(RequestedLocalProperties 
requiredLocalProps) {
-               this.requiredLocalProps = requiredLocalProps;
-       }
-
-       public GlobalProperties getGlobalProperties() {
-               if (this.globalProps == null) {
-                       this.globalProps = 
this.source.getGlobalProperties().clone();
-                       switch (this.shipStrategy) {
-                               case BROADCAST:
-                                       
this.globalProps.clearUniqueFieldCombinations();
-                                       this.globalProps.setFullyReplicated();
-                                       break;
-                               case PARTITION_HASH:
-                                       
this.globalProps.setHashPartitioned(this.shipKeys);
-                                       break;
-                               case PARTITION_RANGE:
-                                       
this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, 
this.shipSortOrder));
-                                       break;
-                               case FORWARD:
-                                       break;
-                               case PARTITION_RANDOM:
-                                       this.globalProps.reset();
-                                       break;
-                               case PARTITION_FORCED_REBALANCE:
-                                       this.globalProps.setForcedRebalanced();
-                                       break;
-                               case PARTITION_CUSTOM:
-                                       
this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
-                                       break;
-                               case NONE:
-                                       throw new CompilerException("Cannot 
produce GlobalProperties before ship strategy is set.");
-                       }
-               }
-               
-               return this.globalProps;
-       }
-       
-       public LocalProperties getLocalProperties() {
-               if (this.localProps == null) {
-                       computeLocalPropertiesAfterShippingOnly();
-                       switch (this.localStrategy) {
-                               case NONE:
-                                       break;
-                               case SORT:
-                               case COMBININGSORT:
-                                       this.localProps = 
LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, 
this.localSortOrder));
-                                       break;
-                               default:
-                                       throw new 
CompilerException("Unsupported local strategy for channel.");
-                       }
-               }
-               
-               return this.localProps;
-       }
-       
-       private void computeLocalPropertiesAfterShippingOnly() {
-               switch (this.shipStrategy) {
-                       case BROADCAST:
-                       case PARTITION_HASH:
-                       case PARTITION_CUSTOM:
-                       case PARTITION_RANGE:
-                       case PARTITION_RANDOM:
-                       case PARTITION_FORCED_REBALANCE:
-                               this.localProps = new LocalProperties();
-                               break;
-                       case FORWARD:
-                               this.localProps = 
this.source.getLocalProperties();
-                               break;
-                       case NONE:
-                               throw new CompilerException("ShipStrategy has 
not yet been set.");
-                       default:
-                               throw new CompilerException("Unknown 
ShipStrategy.");
-               }
-       }
-       
-       public void adjustGlobalPropertiesForFullParallelismChange() {
-               if (this.shipStrategy == null || this.shipStrategy == 
ShipStrategyType.NONE) {
-                       throw new IllegalStateException("Cannot adjust channel 
for degree of parallelism " +
-                                       "change before the ship strategy is 
set.");
-               }
-               
-               // make sure the properties are acquired
-               if (this.globalProps == null) {
-                       getGlobalProperties();
-               }
-               
-               // some strategies globally reestablish properties
-               switch (this.shipStrategy) {
-               case FORWARD:
-                       throw new CompilerException("Cannot use FORWARD 
strategy between operations " +
-                                       "with different number of parallel 
instances.");
-               case NONE: // excluded by sanity check. left here for 
verification check completion
-               case BROADCAST:
-               case PARTITION_HASH:
-               case PARTITION_RANGE:
-               case PARTITION_RANDOM:
-               case PARTITION_FORCED_REBALANCE:
-               case PARTITION_CUSTOM:
-                       return;
-               }
-               throw new CompilerException("Unrecognized Ship Strategy Type: " 
+ this.shipStrategy);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Utility method used while swapping binary union nodes for n-ary 
union nodes.
-        */
-       public void swapUnionNodes(PlanNode newUnionNode) {
-               if (!(this.source instanceof BinaryUnionPlanNode)) {
-                       throw new IllegalStateException();
-               } else {
-                       this.source = newUnionNode;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public int getMaxDepth() {
-               return this.source.getOptimizerNode().getMaxDepth() + 1;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "Channel (" + this.source + (this.target == null ? ')' : 
") -> (" + this.target + ')') +
-                               '[' + this.shipStrategy + "] [" + 
this.localStrategy + "] " +
-                               (this.tempMode == null || this.tempMode == 
TempMode.NONE ? "{NO-TEMP}" : this.tempMode);
-       }
-
-       @Override
-       public Channel clone() {
-               try {
-                       return (Channel) super.clone();
-               } catch (CloneNotSupportedException cnsex) {
-                       throw new RuntimeException(cnsex);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
deleted file mode 100644
index 01c56dd..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static 
org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-/**
- *
- */
-public class DualInputPlanNode extends PlanNode {
-       
-       protected final Channel input1;
-       protected final Channel input2;
-       
-       protected final FieldList keys1;
-       protected final FieldList keys2;
-       
-       protected final boolean[] sortOrders;
-       
-       private TypeComparatorFactory<?> comparator1;
-       private TypeComparatorFactory<?> comparator2;
-       private TypePairComparatorFactory<?, ?> pairComparator;
-       
-       public Object postPassHelper1;
-       public Object postPassHelper2;
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       public DualInputPlanNode(OptimizerNode template, String nodeName, 
Channel input1, Channel input2, DriverStrategy diverStrategy) {
-               this(template, nodeName, input1, input2, diverStrategy, null, 
null, null);
-       }
-       
-       public DualInputPlanNode(OptimizerNode template, String nodeName, 
Channel input1, Channel input2,
-                       DriverStrategy diverStrategy, FieldList 
driverKeyFields1, FieldList driverKeyFields2)
-       {
-               this(template, nodeName, input1, input2, diverStrategy, 
driverKeyFields1, driverKeyFields2,
-                                                                       
SingleInputPlanNode.getTrueArray(driverKeyFields1.size()));
-       }
-       
-       public DualInputPlanNode(OptimizerNode template, String nodeName, 
Channel input1, Channel input2, DriverStrategy diverStrategy,
-                       FieldList driverKeyFields1, FieldList driverKeyFields2, 
boolean[] driverSortOrders)
-       {
-               super(template, nodeName, diverStrategy);
-               this.input1 = input1;
-               this.input2 = input2;
-               this.keys1 = driverKeyFields1;
-               this.keys2 = driverKeyFields2;
-               this.sortOrders = driverSortOrders;
-               
-               if (this.input1.getShipStrategy() == 
ShipStrategyType.BROADCAST) {
-                       this.input1.setReplicationFactor(getParallelism());
-               }
-               if (this.input2.getShipStrategy() == 
ShipStrategyType.BROADCAST) {
-                       this.input2.setReplicationFactor(getParallelism());
-               }
-               
-               mergeBranchPlanMaps(input1.getSource(), input2.getSource());
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       public TwoInputNode getTwoInputNode() {
-               if (this.template instanceof TwoInputNode) {
-                       return (TwoInputNode) this.template;
-               } else {
-                       throw new RuntimeException();
-               }
-       }
-       
-       public FieldList getKeysForInput1() {
-               return this.keys1;
-       }
-       
-       public FieldList getKeysForInput2() {
-               return this.keys2;
-       }
-       
-       public boolean[] getSortOrders() {
-               return this.sortOrders;
-       }
-       
-       public TypeComparatorFactory<?> getComparator1() {
-               return this.comparator1;
-       }
-       
-       public TypeComparatorFactory<?> getComparator2() {
-               return this.comparator2;
-       }
-       
-       public void setComparator1(TypeComparatorFactory<?> comparator) {
-               this.comparator1 = comparator;
-       }
-       
-       public void setComparator2(TypeComparatorFactory<?> comparator) {
-               this.comparator2 = comparator;
-       }
-       
-       public TypePairComparatorFactory<?, ?> getPairComparator() {
-               return this.pairComparator;
-       }
-       
-       public void setPairComparator(TypePairComparatorFactory<?, ?> 
comparator) {
-               this.pairComparator = comparator;
-       }
-       
-       /**
-        * Gets the first input channel to this node.
-        * 
-        * @return The first input channel to this node.
-        */
-       public Channel getInput1() {
-               return this.input1;
-       }
-       
-       /**
-        * Gets the second input channel to this node.
-        * 
-        * @return The second input channel to this node.
-        */
-       public Channel getInput2() {
-               return this.input2;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               if (visitor.preVisit(this)) {
-                       this.input1.getSource().accept(visitor);
-                       this.input2.getSource().accept(visitor);
-                       
-                       for (Channel broadcastInput : getBroadcastInputs()) {
-                               broadcastInput.getSource().accept(visitor);
-                       }
-                       
-                       visitor.postVisit(this);
-               }
-       }
-       
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               if (getBroadcastInputs() == null || 
getBroadcastInputs().isEmpty()) {
-                       return Arrays.asList(this.input1.getSource(), 
this.input2.getSource());
-               } else {
-                       List<PlanNode> preds = new ArrayList<PlanNode>();
-                       
-                       preds.add(input1.getSource());
-                       preds.add(input2.getSource());
-
-                       for (Channel c : getBroadcastInputs()) {
-                               preds.add(c.getSource());
-                       }
-                       
-                       return preds;
-               }
-       }
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Arrays.asList(this.input1, this.input2);
-       }
-
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               if (source == this) {
-                       return FOUND_SOURCE;
-               }
-               
-               // check first input
-               SourceAndDamReport res1 = 
this.input1.getSource().hasDamOnPathDownTo(source);
-               if (res1 == FOUND_SOURCE_AND_DAM) {
-                       return FOUND_SOURCE_AND_DAM;
-               }
-               else if (res1 == FOUND_SOURCE) {
-                       if (this.input1.getLocalStrategy().dams() || 
this.input1.getTempMode().breaksPipeline() ||
-                                       getDriverStrategy().firstDam() == 
DamBehavior.FULL_DAM) {
-                               return FOUND_SOURCE_AND_DAM;
-                       } else {
-                               return FOUND_SOURCE;
-                       }
-               }
-               else {
-                       SourceAndDamReport res2 = 
this.input2.getSource().hasDamOnPathDownTo(source);
-                       if (res2 == FOUND_SOURCE_AND_DAM) {
-                               return FOUND_SOURCE_AND_DAM;
-                       }
-                       else if (res2 == FOUND_SOURCE) {
-                               if (this.input2.getLocalStrategy().dams() || 
this.input2.getTempMode().breaksPipeline() ||
-                                               getDriverStrategy().secondDam() 
== DamBehavior.FULL_DAM) {
-                                       return FOUND_SOURCE_AND_DAM;
-                               } else {
-                                       return FOUND_SOURCE;
-                               }
-                       }
-                       else {
-                               // NOT_FOUND
-                               // check the broadcast inputs
-                               
-                               for (NamedChannel nc : getBroadcastInputs()) {
-                                       SourceAndDamReport bcRes = 
nc.getSource().hasDamOnPathDownTo(source);
-                                       if (bcRes != NOT_FOUND) {
-                                               // broadcast inputs are always 
dams
-                                               return FOUND_SOURCE_AND_DAM;
-                                       }
-                               }
-                               return NOT_FOUND;
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
deleted file mode 100644
index d146c83..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-/**
- * A common interface for compiled Flink plans for both batch and streaming
- * processing programs.
- * 
- */
-public interface FlinkPlan {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
deleted file mode 100644
index 38f76b2..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.util.Visitor;
-
-/**
- *
- */
-public interface IterationPlanNode {
-       
-       void acceptForStepFunction(Visitor<PlanNode> visitor);
-       
-       IterationNode getIterationNode();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
deleted file mode 100644
index 3650eea..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.IterableIterator;
-import org.apache.flink.util.Visitor;
-
-/**
- * A union operation over multiple inputs (2 or more).
- */
-public class NAryUnionPlanNode extends PlanNode {
-       
-       private final List<Channel> inputs;
-       
-       /**
-        * @param template
-        */
-       public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> 
inputs, GlobalProperties gProps,
-                       Costs cumulativeCosts)
-       {
-               super(template, "Union", DriverStrategy.NONE);
-               
-               this.inputs = inputs;
-               this.globalProps = gProps;
-               this.localProps = new LocalProperties();
-               this.nodeCosts = new Costs();
-               this.cumulativeCosts = cumulativeCosts;
-       }
-
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               visitor.preVisit(this);
-               for (Channel c : this.inputs) {
-                       c.getSource().accept(visitor);
-               }
-               visitor.postVisit(this);
-       }
-       
-       public List<Channel> getListOfInputs() {
-               return this.inputs;
-       }
-
-       @Override
-       public Iterable<Channel> getInputs() {
-               return Collections.unmodifiableList(this.inputs);
-       }
-
-       @Override
-       public Iterable<PlanNode> getPredecessors() {
-               final Iterator<Channel> channels = this.inputs.iterator();
-               return new IterableIterator<PlanNode>() {
-
-                       @Override
-                       public boolean hasNext() {
-                               return channels.hasNext();
-                       }
-
-                       @Override
-                       public PlanNode next() {
-                               return channels.next().getSource();
-                       }
-
-                       @Override
-                       public void remove() {
-                               throw new UnsupportedOperationException();
-                       }
-                       
-                       @Override
-                       public Iterator<PlanNode> iterator() {
-                               return this;
-                       }
-               };
-       }
-
-       @Override
-       public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-               // this node is used after the plan enumeration. consequently, 
this will never be invoked here
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
deleted file mode 100644
index da97e61..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.TempMode;
-
-public class NamedChannel extends Channel {
-
-       private final String name;
-
-       /**
-        * Initializes NamedChannel.
-        * 
-        * @param sourceNode
-        */
-       public NamedChannel(String name, PlanNode sourceNode) {
-               super(sourceNode);
-               this.name = name;
-       }
-
-       public NamedChannel(String name, PlanNode sourceNode, TempMode 
tempMode) {
-               super(sourceNode, tempMode);
-               this.name = name;
-       }
-
-       public String getName() {
-               return this.name;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
deleted file mode 100644
index d56be87..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.Collection;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.util.Visitable;
-import org.apache.flink.util.Visitor;
-
-/**
- * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
- * and {@link Channel}s that describe exactly how the program should be 
executed.
- * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), 
all
- * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
- * and the data exchange modes (batched, pipelined).
- */
-public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
-       
-       /** The data sources in the plan. */
-       private final Collection<SourcePlanNode> dataSources;
-
-       /** The data sinks in the plan. */
-       private final Collection<SinkPlanNode> dataSinks;
-
-       /** All nodes in the optimizer plan. */
-       private final Collection<PlanNode> allNodes;
-       
-       /** The original program. */
-       private final Plan originalProgram;
-
-       /** Name of the job */
-       private final String jobName;
-
-       /**
-        * Creates a new instance of this optimizer plan container. The plan is 
given and fully
-        * described by the data sources, sinks and the collection of all nodes.
-        * 
-        * @param sources The data sources.
-        * @param sinks The data sinks.
-        * @param allNodes A collection containing all nodes in the plan.
-        * @param jobName The name of the program
-        */
-       public OptimizedPlan(Collection<SourcePlanNode> sources, 
Collection<SinkPlanNode> sinks,
-                       Collection<PlanNode> allNodes, String jobName, Plan 
programPlan)
-       {
-               this.dataSources = sources;
-               this.dataSinks = sinks;
-               this.allNodes = allNodes;
-               this.jobName = jobName;
-               this.originalProgram = programPlan;
-       }
-
-       /**
-        * Gets the data sources from this OptimizedPlan.
-        * 
-        * @return The data sources.
-        */
-       public Collection<SourcePlanNode> getDataSources() {
-               return dataSources;
-       }
-
-       /**
-        * Gets the data sinks from this OptimizedPlan.
-        * 
-        * @return The data sinks.
-        */
-       public Collection<SinkPlanNode> getDataSinks() {
-               return dataSinks;
-       }
-
-       /**
-        * Gets all the nodes from this OptimizedPlan.
-        * 
-        * @return All nodes.
-        */
-       public Collection<PlanNode> getAllNodes() {
-               return allNodes;
-       }
-
-       /**
-        * Returns the name of the program.
-        * 
-        * @return The name of the program.
-        */
-       public String getJobName() {
-               return this.jobName;
-       }
-       
-       /**
-        * Gets the original program plan from which this optimized plan was 
created.
-        * 
-        * @return The original program plan.
-        */
-       public Plan getOriginalPactPlan() {
-               return this.originalProgram;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Applies the given visitor top down to all nodes, starting at the 
sinks.
-        * 
-        * @param visitor
-        *        The visitor to apply to the nodes in this plan.
-        * @see 
org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
-        */
-       @Override
-       public void accept(Visitor<PlanNode> visitor) {
-               for (SinkPlanNode node : this.dataSinks) {
-                       node.accept(visitor);
-               }
-       }
-}

Reply via email to