http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
deleted file mode 100644
index b3c083a..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-public final class AllGroupWithPartialPreGroupProperties extends 
OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.ALL_GROUP_REDUCE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
-                       // locally connected, directly instantiate
-                       return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")",
-                                                                               
        in, DriverStrategy.ALL_GROUP_REDUCE);
-               } else {
-                       // non forward case.plug in a combiner
-                       Channel toCombiner = new Channel(in.getSource());
-                       toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       
-                       // create an input node for combine with same DOP as 
input node
-                       GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
-                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                       "Combine 
("+node.getOperator().getName()+")", toCombiner, 
DriverStrategy.ALL_GROUP_REDUCE_COMBINE);
-                       combiner.setCosts(new Costs(0, 0));
-                       
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
-                       
-                       Channel toReducer = new Channel(combiner);
-                       toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-                                                                               
in.getShipStrategySortOrder(), in.getDataExchangeMode());
-
-                       toReducer.setLocalStrategy(in.getLocalStrategy(), 
in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-                       return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")",
-                                                                               
        toReducer, DriverStrategy.ALL_GROUP_REDUCE);
-               }
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
deleted file mode 100644
index a172a60..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-public final class AllReduceProperties extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.ALL_REDUCE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
-                       // locally connected, directly instantiate
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
-                                                                               
        in, DriverStrategy.ALL_REDUCE);
-               } else {
-                       // non forward case.plug in a combiner
-                       Channel toCombiner = new Channel(in.getSource());
-                       toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-                       
-                       // create an input node for combine with same DOP as 
input node
-                       ReduceNode combinerNode = ((ReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
-                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode,
-                                       "Combine 
("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
-                       combiner.setCosts(new Costs(0, 0));
-                       
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
-                       
-                       Channel toReducer = new Channel(combiner);
-                       toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-                                                                               
in.getShipStrategySortOrder(), in.getDataExchangeMode());
-                       toReducer.setLocalStrategy(in.getLocalStrategy(), 
in.getLocalStrategyKeys(),
-                                                                               
in.getLocalStrategySortOrder());
-
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
-                                                                               
        toReducer, DriverStrategy.ALL_REDUCE);
-               }
-       }
-       
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               return Collections.singletonList(new 
RequestedGlobalProperties());
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               return new GlobalProperties();
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return new LocalProperties();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
deleted file mode 100644
index 8cc517e..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class BinaryUnionOpDescriptor extends OperatorDescriptorDual {
-       
-       public BinaryUnionOpDescriptor() {
-               super();
-       }
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.UNION;
-       }
-
-       @Override
-       protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
-               return Collections.emptyList();
-       }
-       
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               return Collections.emptyList();
-       }
-
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               return new BinaryUnionPlanNode((BinaryUnionNode) node, in1, 
in2);
-       }
-
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
-               GlobalProperties newProps = new GlobalProperties();
-               
-               if (in1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
-                       in2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
-                       
in1.getPartitioningFields().equals(in2.getPartitioningFields()))
-               {
-                       
newProps.setHashPartitioned(in1.getPartitioningFields());
-               }
-               
-               return newProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               // all local properties are destroyed
-               return new LocalProperties();
-       }
-
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2) {
-               return true;
-       }
-       
-       @Override
-       public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
-                       GlobalProperties produced1, GlobalProperties produced2) 
{
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
deleted file mode 100644
index f48e297..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-
-
-public abstract class CartesianProductDescriptor extends 
OperatorDescriptorDual {
-       
-       private final boolean allowBroadcastFirst;
-       private final boolean allowBroadcastSecond;
-       
-       
-       protected CartesianProductDescriptor(boolean allowBroadcastFirst, 
boolean allowBroadcastSecond) {
-               if (!(allowBroadcastFirst | allowBroadcastSecond)) {
-                       throw new IllegalArgumentException();
-               }
-
-               this.allowBroadcastFirst = allowBroadcastFirst;
-               this.allowBroadcastSecond = allowBroadcastSecond;
-       }
-       
-       
-       @Override
-       protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
-               ArrayList<GlobalPropertiesPair> pairs = new 
ArrayList<GlobalPropertiesPair>();
-               
-               if (this.allowBroadcastFirst) {
-                       // replicate first
-                       RequestedGlobalProperties replicated1 = new 
RequestedGlobalProperties();
-                       replicated1.setFullyReplicated();
-                       RequestedGlobalProperties any2 = new 
RequestedGlobalProperties();
-                       pairs.add(new GlobalPropertiesPair(replicated1, any2));
-               }
-               
-               if (this.allowBroadcastSecond) {
-                       // replicate second
-                       RequestedGlobalProperties any1 = new 
RequestedGlobalProperties();
-                       RequestedGlobalProperties replicated2 = new 
RequestedGlobalProperties();
-                       replicated2.setFullyReplicated();
-                       pairs.add(new GlobalPropertiesPair(any1, replicated2));
-               }
-
-               return pairs;
-       }
-       
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               // all properties are possible
-               return Collections.singletonList(new LocalPropertiesPair(
-                       new RequestedLocalProperties(), new 
RequestedLocalProperties()));
-       }
-       
-       @Override
-       public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
-                       GlobalProperties produced1, GlobalProperties produced2) 
{
-               return true;
-       }
-
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2) {
-               return true;
-       }
-       
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               return new DualInputPlanNode(node, 
"Cross("+node.getOperator().getName()+")", in1, in2, getStrategy());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
-               GlobalProperties gp = GlobalProperties.combine(in1, in2);
-               if (gp.getUniqueFieldCombination() != null && 
gp.getUniqueFieldCombination().size() > 0 &&
-                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gp.clearUniqueFieldCombinations();
-               return gp;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
deleted file mode 100644
index 368944e..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public class CoGroupDescriptor extends OperatorDescriptorDual {
-       
-       private final Ordering ordering1;               // ordering on the 
first input 
-       private final Ordering ordering2;               // ordering on the 
second input 
-       
-       private Partitioner<?> customPartitioner;
-       
-       
-       public CoGroupDescriptor(FieldList keys1, FieldList keys2) {
-               this(keys1, keys2, null, null);
-       }
-       
-       public CoGroupDescriptor(FieldList keys1, FieldList keys2, Ordering 
additionalOrdering1, Ordering additionalOrdering2) {
-               super(keys1, keys2);
-               
-               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
-               if (additionalOrdering1 != null) {
-                       this.ordering1 = new Ordering();
-                       for (Integer key : this.keys1) {
-                               this.ordering1.appendOrdering(key, null, 
Order.ANY);
-                       }
-               
-                       // and next the additional order fields
-                       for (int i = 0; i < 
additionalOrdering1.getNumberOfFields(); i++) {
-                               Integer field = 
additionalOrdering1.getFieldNumber(i);
-                               Order order = additionalOrdering1.getOrder(i);
-                               this.ordering1.appendOrdering(field, 
additionalOrdering1.getType(i), order);
-                       }
-               } else {
-                       this.ordering1 = Utils.createOrdering(this.keys1);
-               }
-               
-               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
-               if (additionalOrdering2 != null) {
-                       this.ordering2 = new Ordering();
-                       for (Integer key : this.keys2) {
-                               this.ordering2.appendOrdering(key, null, 
Order.ANY);
-                       }
-               
-                       // and next the additional order fields
-                       for (int i = 0; i < 
additionalOrdering2.getNumberOfFields(); i++) {
-                               Integer field = 
additionalOrdering2.getFieldNumber(i);
-                               Order order = additionalOrdering2.getOrder(i);
-                               this.ordering2.appendOrdering(field, 
additionalOrdering2.getType(i), order);
-                       }
-               } else {
-                       this.ordering2 = Utils.createOrdering(this.keys2);
-               }
-       }
-       
-       public void setCustomPartitioner(Partitioner<?> customPartitioner) {
-               this.customPartitioner = customPartitioner;
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.CO_GROUP;
-       }
-
-       @Override
-       protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
-
-               if (this.customPartitioner == null) {
-
-                       // we accept compatible partitionings of any type
-                       RequestedGlobalProperties partitioned_left_any = new 
RequestedGlobalProperties();
-                       RequestedGlobalProperties partitioned_right_any = new 
RequestedGlobalProperties();
-                       partitioned_left_any.setAnyPartitioning(this.keys1);
-                       partitioned_right_any.setAnyPartitioning(this.keys2);
-
-                       // add strict hash partitioning of both inputs on their 
full key sets
-                       RequestedGlobalProperties partitioned_left_hash = new 
RequestedGlobalProperties();
-                       RequestedGlobalProperties partitioned_right_hash = new 
RequestedGlobalProperties();
-                       partitioned_left_hash.setHashPartitioned(this.keys1);
-                       partitioned_right_hash.setHashPartitioned(this.keys2);
-
-                       return Arrays.asList(new 
GlobalPropertiesPair(partitioned_left_any, partitioned_right_any),
-                                       new 
GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
-               }
-               else {
-                       RequestedGlobalProperties partitioned_left = new 
RequestedGlobalProperties();
-                       partitioned_left.setCustomPartitioned(this.keys1, 
this.customPartitioner);
-
-                       RequestedGlobalProperties partitioned_right = new 
RequestedGlobalProperties();
-                       partitioned_right.setCustomPartitioned(this.keys2, 
this.customPartitioner);
-
-                       return Collections.singletonList(new 
GlobalPropertiesPair(partitioned_left, partitioned_right));
-               }
-       }
-       
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               RequestedLocalProperties sort1 = new 
RequestedLocalProperties(this.ordering1);
-               RequestedLocalProperties sort2 = new 
RequestedLocalProperties(this.ordering2);
-               return Collections.singletonList(new LocalPropertiesPair(sort1, 
sort2));
-       }
-       
-       @Override
-       public boolean areCompatible(RequestedGlobalProperties requested1, 
RequestedGlobalProperties requested2,
-                       GlobalProperties produced1, GlobalProperties produced2)
-       {
-
-               if(produced1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
-                               produced2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
-
-                       // both are hash partitioned, check that partitioning 
fields are equivalently chosen
-                       return checkEquivalentFieldPositionsInKeyFields(
-                                       produced1.getPartitioningFields(), 
produced2.getPartitioningFields());
-
-               }
-               else if(produced1.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED &&
-                               produced2.getPartitioning() == 
PartitioningProperty.RANGE_PARTITIONED) {
-
-                       // both are range partitioned, check that partitioning 
fields are equivalently chosen
-                       return checkEquivalentFieldPositionsInKeyFields(
-                                       produced1.getPartitioningFields(), 
produced2.getPartitioningFields());
-
-               }
-               else if(produced1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING &&
-                               produced2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING) {
-
-                       // both use a custom partitioner. Check that both keys 
are exactly as specified and that both the same partitioner
-                       return 
produced1.getPartitioningFields().isExactMatch(this.keys1) &&
-                                       
produced2.getPartitioningFields().isExactMatch(this.keys2) &&
-                                       produced1.getCustomPartitioner() != 
null && produced2.getCustomPartitioner() != null &&
-                                       
produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner());
-
-               }
-               else {
-
-                       // no other partitioning valid, incl. ANY_PARTITIONING.
-                       //   For co-groups we must ensure that both sides are 
exactly identically partitioned, ANY is not good enough.
-                       return false;
-               }
-
-       }
-       
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2)
-       {
-               int numRelevantFields = this.keys1.size();
-               
-               Ordering prod1 = produced1.getOrdering();
-               Ordering prod2 = produced2.getOrdering();
-               
-               if (prod1 == null || prod2 == null) {
-                       throw new CompilerException("The given properties do 
not meet this operators requirements.");
-               }
-
-               // check that order of fields is equivalent
-               if (!checkEquivalentFieldPositionsInKeyFields(
-                               prod1.getInvolvedIndexes(), 
prod2.getInvolvedIndexes(), numRelevantFields)) {
-                       return false;
-               }
-
-               // check that order directions are equivalent
-               for (int i = 0; i < numRelevantFields; i++) {
-                       if (prod1.getOrder(i) != prod2.getOrder(i)) {
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               boolean[] inputOrders = in1.getLocalProperties().getOrdering() 
== null ? null : 
in1.getLocalProperties().getOrdering().getFieldSortDirections();
-               
-               if (inputOrders == null || inputOrders.length < 
this.keys1.size()) {
-                       throw new CompilerException("BUG: The input strategy 
does not sufficiently describe the sort orders for a CoGroup operator.");
-               } else if (inputOrders.length > this.keys1.size()) {
-                       boolean[] tmp = new boolean[this.keys1.size()];
-                       System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
-                       inputOrders = tmp;
-               }
-               
-               return new DualInputPlanNode(node, "CoGroup 
("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, 
this.keys1, this.keys2, inputOrders);
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties in1, 
GlobalProperties in2) {
-               GlobalProperties gp = GlobalProperties.combine(in1, in2);
-               if (gp.getUniqueFieldCombination() != null && 
gp.getUniqueFieldCombination().size() > 0 &&
-                                       gp.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gp.clearUniqueFieldCombinations();
-               return gp;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               LocalProperties comb = LocalProperties.combine(in1, in2);
-               return comb.clearUniqueFieldSets();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
deleted file mode 100644
index 8e7edeb..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * 
- */
-public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
-       
-       public CoGroupWithSolutionSetFirstDescriptor(FieldList keys1, FieldList 
keys2) {
-               super(keys1, keys2);
-       }
-       
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               RequestedLocalProperties none = new RequestedLocalProperties();
-               RequestedLocalProperties sort = new 
RequestedLocalProperties(Utils.createOrdering(this.keys2));
-               return Collections.singletonList(new LocalPropertiesPair(none, 
sort));
-       }
-
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               boolean[] inputOrders = in2.getLocalProperties().getOrdering() 
== null ? null : 
in2.getLocalProperties().getOrdering().getFieldSortDirections();
-
-               if (inputOrders == null || inputOrders.length < 
this.keys2.size()) {
-                       throw new CompilerException("BUG: The input strategy 
does not sufficiently describe the sort orders for a CoGroup operator.");
-               } else if (inputOrders.length > this.keys2.size()) {
-                       boolean[] tmp = new boolean[this.keys2.size()];
-                       System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
-                       inputOrders = tmp;
-               }
-
-               return new DualInputPlanNode(node, "CoGroup 
("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, 
this.keys1, this.keys2, inputOrders);
-       }
-
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2)
-       {
-               return true;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return in2;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java
deleted file mode 100644
index 016eb79..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetSecondDescriptor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.util.Utils;
-
-/**
- * 
- */
-public class CoGroupWithSolutionSetSecondDescriptor extends CoGroupDescriptor {
-       
-       public CoGroupWithSolutionSetSecondDescriptor(FieldList keys1, 
FieldList keys2) {
-               super(keys1, keys2);
-       }
-       
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               RequestedLocalProperties sort = new 
RequestedLocalProperties(Utils.createOrdering(this.keys1));
-               RequestedLocalProperties none = new RequestedLocalProperties();
-               return Collections.singletonList(new LocalPropertiesPair(sort, 
none));
-       }
-       
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2)
-       {
-               return true;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return in1;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
deleted file mode 100644
index bcd4d73..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CollectorMapDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.COLLECTOR_MAP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Map 
("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-               rgp.setAnyDistribution();
-               return Collections.singletonList(rgp);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java
deleted file mode 100644
index 4a467dc..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterFirstDescriptor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CrossBlockOuterFirstDescriptor extends CartesianProductDescriptor 
{
-       
-       public CrossBlockOuterFirstDescriptor() {
-               this(true, true);
-       }
-       
-       public CrossBlockOuterFirstDescriptor(boolean allowBroadcastFirst, 
boolean allowBroadcastSecond) {
-               super(allowBroadcastFirst, allowBroadcastSecond);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return new LocalProperties();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java
deleted file mode 100644
index 212e048..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossBlockOuterSecondDescriptor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CrossBlockOuterSecondDescriptor extends 
CartesianProductDescriptor {
-       
-       public CrossBlockOuterSecondDescriptor() {
-               this(true, true);
-       }
-       
-       public CrossBlockOuterSecondDescriptor(boolean allowBroadcastFirst, 
boolean allowBroadcastSecond) {
-               super(allowBroadcastFirst, allowBroadcastSecond);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return new LocalProperties();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java
deleted file mode 100644
index 9c7bd63..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterFirstDescriptor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CrossStreamOuterFirstDescriptor extends 
CartesianProductDescriptor {
-       
-       public CrossStreamOuterFirstDescriptor() {
-               this(true, true);
-       }
-       
-       public CrossStreamOuterFirstDescriptor(boolean allowBroadcastFirst, 
boolean allowBroadcastSecond) {
-               super(allowBroadcastFirst, allowBroadcastSecond);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               // uniqueness becomes grouping with streamed nested loops
-               if ((in1.getGroupedFields() == null || 
in1.getGroupedFields().size() == 0) &&
-                               in1.getUniqueFields() != null && 
in1.getUniqueFields().size() > 0)
-               {
-                       return 
LocalProperties.forGrouping(in1.getUniqueFields().iterator().next().toFieldList());
-               } else {
-                       return in1.clearUniqueFieldSets();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
deleted file mode 100644
index 3fabad6..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CrossStreamOuterSecondDescriptor extends 
CartesianProductDescriptor {
-       
-       public CrossStreamOuterSecondDescriptor() {
-               this(true, true);
-       }
-       
-       public CrossStreamOuterSecondDescriptor(boolean allowBroadcastFirst, 
boolean allowBroadcastSecond) {
-               super(allowBroadcastFirst, allowBroadcastSecond);
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               // uniqueness becomes grouping with streamed nested loops
-               if ((in2.getGroupedFields() == null || 
in2.getGroupedFields().size() == 0) &&
-                               in2.getUniqueFields() != null && 
in2.getUniqueFields().size() > 0)
-               {
-                       return 
LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList());
-               } else {
-                       return in2.clearUniqueFieldSets();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
deleted file mode 100644
index 81c823f..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class FilterDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.FLAT_MAP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Filter 
("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-               rgp.setAnyDistribution();
-               return Collections.singletonList(rgp);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
deleted file mode 100644
index b915e45..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class FlatMapDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.FLAT_MAP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "FlatMap 
("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-               rgp.setAnyDistribution();
-               return Collections.singletonList(rgp);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
deleted file mode 100644
index b648386..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The properties file belonging to the GroupCombineNode. It translates the 
GroupCombine operation
- * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping 
and sorting keys.
- * @see org.apache.flink.optimizer.dag.GroupCombineNode
- */
-public final class GroupCombineProperties extends OperatorDescriptorSingle {
-
-       private final Ordering ordering;        // ordering that we need to use 
if an additional ordering is requested 
-
-       public GroupCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys) {
-               super(groupKeys);
-
-               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
-               
-               this.ordering = new Ordering();
-               for (Integer key : this.keyList) {
-                       this.ordering.appendOrdering(key, null, Order.ANY);
-               }
-
-               // and next the additional order fields
-               if (additionalOrderKeys != null) {
-                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
-                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
-                               Order order = additionalOrderKeys.getOrder(i);
-                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
-                       }
-               }
-
-       }
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.SORTED_GROUP_COMBINE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               node.setDegreeOfParallelism(in.getSource().getParallelism());
-               
-               // sorting key info
-               SingleInputPlanNode singleInputPlanNode = new 
SingleInputPlanNode(
-                               node, 
-                               "GroupCombine (" + node.getOperator().getName() 
+ ")",
-                               in, // reuse the combine strategy also used in 
the group reduce
-                               DriverStrategy.SORTED_GROUP_COMBINE, 
this.keyList);
-
-               // set sorting comparator key info
-               
singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), 
this.ordering.getFieldSortDirections(), 0);
-               // set grouping comparator key info
-               singleInputPlanNode.setDriverKeyInfo(this.keyList, 1);
-               
-               return singleInputPlanNode;
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
-               props.setRandomPartitioning();
-               return Collections.singletonList(props);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED) {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
deleted file mode 100644
index ebd09f2..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public final class GroupReduceProperties extends OperatorDescriptorSingle {
-       
-       private final Ordering ordering;                // ordering that we 
need to use if an additional ordering is requested 
-
-       private final Partitioner<?> customPartitioner;
-       
-       
-       public GroupReduceProperties(FieldSet keys) {
-               this(keys, null, null);
-       }
-       
-       public GroupReduceProperties(FieldSet keys, Ordering 
additionalOrderKeys) {
-               this(keys, additionalOrderKeys, null);
-       }
-       
-       public GroupReduceProperties(FieldSet keys, Partitioner<?> 
customPartitioner) {
-               this(keys, null, customPartitioner);
-       }
-       
-       public GroupReduceProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys, Partitioner<?> customPartitioner) {
-               super(groupKeys);
-               
-               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
-               if (additionalOrderKeys != null) {
-                       this.ordering = new Ordering();
-                       for (Integer key : this.keyList) {
-                               this.ordering.appendOrdering(key, null, 
Order.ANY);
-                       }
-               
-                       // and next the additional order fields
-                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
-                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
-                               Order order = additionalOrderKeys.getOrder(i);
-                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
-                       }
-               }
-               else {
-                       this.ordering = null;
-               }
-               
-               this.customPartitioner = customPartitioner;
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.SORTED_GROUP_REDUCE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "GroupReduce 
("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, 
this.keyList);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
-               
-               if (customPartitioner == null) {
-                       props.setAnyPartitioning(this.keys);
-               } else {
-                       props.setCustomPartitioned(this.keys, 
this.customPartitioner);
-               }
-               return Collections.singletonList(props);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               RequestedLocalProperties props = new RequestedLocalProperties();
-               if (this.ordering == null) {
-                       props.setGroupedFields(this.keys);
-               } else {
-                       props.setOrdering(this.ordering);
-               }
-               return Collections.singletonList(props);
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
deleted file mode 100644
index c4f47d3..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-
-public final class GroupReduceWithCombineProperties extends 
OperatorDescriptorSingle {
-       
-       private final Ordering ordering;                // ordering that we 
need to use if an additional ordering is requested 
-       
-       private final Partitioner<?> customPartitioner;
-       
-       
-       public GroupReduceWithCombineProperties(FieldSet groupKeys) {
-               this(groupKeys, null, null);
-       }
-       
-       public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys) {
-               this(groupKeys, additionalOrderKeys, null);
-       }
-       
-       public GroupReduceWithCombineProperties(FieldSet groupKeys, 
Partitioner<?> customPartitioner) {
-               this(groupKeys, null, customPartitioner);
-       }
-       
-       public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering 
additionalOrderKeys, Partitioner<?> customPartitioner) {
-               super(groupKeys);
-               
-               // if we have an additional ordering, construct the ordering to 
have primarily the grouping fields
-               if (additionalOrderKeys != null) {
-                       this.ordering = new Ordering();
-                       for (Integer key : this.keyList) {
-                               this.ordering.appendOrdering(key, null, 
Order.ANY);
-                       }
-               
-                       // and next the additional order fields
-                       for (int i = 0; i < 
additionalOrderKeys.getNumberOfFields(); i++) {
-                               Integer field = 
additionalOrderKeys.getFieldNumber(i);
-                               Order order = additionalOrderKeys.getOrder(i);
-                               this.ordering.appendOrdering(field, 
additionalOrderKeys.getType(i), order);
-                       }
-               } else {
-                       this.ordering = null;
-               }
-               
-               this.customPartitioner = customPartitioner;
-       }
-       
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.SORTED_GROUP_REDUCE;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
-                       // adjust a sort (changes grouping, so it must be for 
this driver to combining sort
-                       if (in.getLocalStrategy() == LocalStrategy.SORT) {
-                               if 
(!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
-                                       throw new RuntimeException("Bug: 
Inconsistent sort for group strategy.");
-                               }
-                               
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
-                                                                       
in.getLocalStrategySortOrder());
-                       }
-                       return new SingleInputPlanNode(node, 
"Reduce("+node.getOperator().getName()+")", in,
-                                                                               
        DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
-               } else {
-                       // non forward case. all local properties are killed 
anyways, so we can safely plug in a combiner
-                       Channel toCombiner = new Channel(in.getSource());
-                       toCombiner.setShipStrategy(ShipStrategyType.FORWARD, 
DataExchangeMode.PIPELINED);
-
-                       // create an input node for combine with same DOP as 
input node
-                       GroupReduceNode combinerNode = ((GroupReduceNode) 
node).getCombinerUtilityNode();
-                       
combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
-                       SingleInputPlanNode combiner = new 
SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
-                                       .getName()+")", toCombiner, 
DriverStrategy.SORTED_GROUP_COMBINE);
-                       combiner.setCosts(new Costs(0, 0));
-                       
combiner.initProperties(toCombiner.getGlobalProperties(), 
toCombiner.getLocalProperties());
-                       // set sorting comparator key info
-                       combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), 
in.getLocalStrategySortOrder(), 0);
-                       // set grouping comparator key info
-                       combiner.setDriverKeyInfo(this.keyList, 1);
-                       
-                       Channel toReducer = new Channel(combiner);
-                       toReducer.setShipStrategy(in.getShipStrategy(), 
in.getShipStrategyKeys(),
-                                                                       
in.getShipStrategySortOrder(), in.getDataExchangeMode());
-                       toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, 
in.getLocalStrategyKeys(),
-                                                                               
in.getLocalStrategySortOrder());
-
-                       return new SingleInputPlanNode(node, "Reduce 
("+node.getOperator().getName()+")",
-                                                                               
        toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
-               }
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties props = new 
RequestedGlobalProperties();
-               if (customPartitioner == null) {
-                       props.setAnyPartitioning(this.keys);
-               } else {
-                       props.setCustomPartitioned(this.keys, 
this.customPartitioner);
-               }
-               return Collections.singletonList(props);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               RequestedLocalProperties props = new RequestedLocalProperties();
-               if (this.ordering == null) {
-                       props.setGroupedFields(this.keys);
-               } else {
-                       props.setOrdering(this.ordering);
-               }
-               return Collections.singletonList(props);
-       }
-
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
deleted file mode 100644
index fec72a9..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
-       
-       public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2) {
-               super(keys1, keys2);
-       }
-       
-       public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2,
-                       boolean broadcastFirstAllowed, boolean 
broadcastSecondAllowed, boolean repartitionAllowed)
-       {
-               super(keys1, keys2, broadcastFirstAllowed, 
broadcastSecondAllowed, repartitionAllowed);
-       }
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.HYBRIDHASH_BUILD_FIRST;
-       }
-
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               // all properties are possible
-               return Collections.singletonList(new LocalPropertiesPair(new 
RequestedLocalProperties(), new RequestedLocalProperties()));
-       }
-       
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2)
-       {
-               return true;
-       }
-
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               DriverStrategy strategy;
-               
-               if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) {
-                       // sanity check that the first input is cached and 
remove that cache
-                       if (!in1.getTempMode().isCached()) {
-                               throw new CompilerException("No cache at point 
where static and dynamic parts meet.");
-                       }
-                       in1.setTempMode(in1.getTempMode().makeNonCached());
-                       strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
-               }
-               else {
-                       strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
-               }
-               return new DualInputPlanNode(node, 
"Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return new LocalProperties();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
deleted file mode 100644
index f9d1e6c..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public final class HashJoinBuildSecondProperties extends 
AbstractJoinDescriptor {
-       
-       public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2) {
-               super(keys1, keys2);
-       }
-       
-       public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2,
-                       boolean broadcastFirstAllowed, boolean 
broadcastSecondAllowed, boolean repartitionAllowed)
-       {
-               super(keys1, keys2, broadcastFirstAllowed, 
broadcastSecondAllowed, repartitionAllowed);
-       }
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.HYBRIDHASH_BUILD_SECOND;
-       }
-
-       @Override
-       protected List<LocalPropertiesPair> createPossibleLocalProperties() {
-               // all properties are possible
-               return Collections.singletonList(new LocalPropertiesPair(
-                       new RequestedLocalProperties(), new 
RequestedLocalProperties()));
-       }
-       
-       @Override
-       public boolean areCoFulfilled(RequestedLocalProperties requested1, 
RequestedLocalProperties requested2,
-                       LocalProperties produced1, LocalProperties produced2)
-       {
-               return true;
-       }
-
-       @Override
-       public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
-               DriverStrategy strategy;
-               
-               if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
-                       // sanity check that the first input is cached and 
remove that cache
-                       if (!in2.getTempMode().isCached()) {
-                               throw new CompilerException("No cache at point 
where static and dynamic parts meet.");
-                       }
-                       
-                       in2.setTempMode(in2.getTempMode().makeNonCached());
-                       strategy = 
DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
-               }
-               else {
-                       strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
-               }
-               return new DualInputPlanNode(node, "Join 
("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, 
this.keys2);
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties in1, 
LocalProperties in2) {
-               return new LocalProperties();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
deleted file mode 100644
index 9f14d2a..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class MapDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.MAP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Map 
("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-               rgp.setAnyDistribution();
-               return Collections.singletonList(rgp);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
 
b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
deleted file mode 100644
index 1489097..0000000
--- 
a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class MapPartitionDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.MAP_PARTITION;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "MapPartition 
("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-               rgp.setAnyDistribution();
-               return Collections.singletonList(rgp);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps;
-       }
-}

Reply via email to