http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
deleted file mode 100644
index 565d992..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Visitor;
-import org.junit.Before;
-
-/**
- * Base class for Optimizer tests. Offers utility methods to trigger 
optimization
- * of a program and to fetch the nodes in an optimizer plan that correspond
- * the the node in the program plan.
- */
-public abstract class CompilerTestBase implements java.io.Serializable {
-       
-       private static final long serialVersionUID = 1L;
-
-       protected static final String IN_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/random";
-       
-       protected static final String OUT_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/null";
-       
-       protected static final int DEFAULT_PARALLELISM = 8;
-       
-       private static final String CACHE_KEY = "cachekey";
-       
-       // 
------------------------------------------------------------------------
-       
-       protected transient DataStatistics dataStats;
-       
-       protected transient Optimizer withStatsCompiler;
-       
-       protected transient Optimizer noStatsCompiler;
-       
-       private transient int statCounter;
-       
-       // 
------------------------------------------------------------------------     
-       
-       @Before
-       public void setup() {
-               this.dataStats = new DataStatistics();
-               this.withStatsCompiler = new Optimizer(this.dataStats, new 
DefaultCostEstimator());
-               
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-               
-               this.noStatsCompiler = new Optimizer(null, new 
DefaultCostEstimator());
-               
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public OptimizedPlan compileWithStats(Plan p) {
-               return this.withStatsCompiler.compile(p);
-       }
-       
-       public OptimizedPlan compileNoStats(Plan p) {
-               return this.noStatsCompiler.compile(p);
-       }
-       
-       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
long size, float recordWidth) {
-               setSourceStatistics(source, new 
FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
-       }
-       
-       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
FileBaseStatistics stats) {
-               final String key = CACHE_KEY + this.statCounter++;
-               this.dataStats.cacheBaseStatistics(stats, key);
-               source.setStatisticsKey(key);
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public static OptimizerPlanNodeResolver 
getOptimizerPlanNodeResolver(OptimizedPlan plan) {
-               return new OptimizerPlanNodeResolver(plan);
-       }
-       
-       public static final class OptimizerPlanNodeResolver {
-               
-               private final Map<String, ArrayList<PlanNode>> map;
-               
-               OptimizerPlanNodeResolver(OptimizedPlan p) {
-                       HashMap<String, ArrayList<PlanNode>> map = new 
HashMap<String, ArrayList<PlanNode>>();
-                       
-                       for (PlanNode n : p.getAllNodes()) {
-                               Operator<?> c = 
n.getOriginalOptimizerNode().getOperator();
-                               String name = c.getName();
-                               
-                               ArrayList<PlanNode> list = map.get(name);
-                               if (list == null) {
-                                       list = new ArrayList<PlanNode>(2);
-                                       map.put(name, list);
-                               }
-                               
-                               // check whether this node is a child of a node 
with the same contract (aka combiner)
-                               boolean shouldAdd = true;
-                               for (Iterator<PlanNode> iter = list.iterator(); 
iter.hasNext();) {
-                                       PlanNode in = iter.next();
-                                       if 
(in.getOriginalOptimizerNode().getOperator() == c) {
-                                               // is this the child or is our 
node the child
-                                               if (in instanceof 
SingleInputPlanNode && n instanceof SingleInputPlanNode) {
-                                                       SingleInputPlanNode 
thisNode = (SingleInputPlanNode) n;
-                                                       SingleInputPlanNode 
otherNode = (SingleInputPlanNode) in;
-                                                       
-                                                       if 
(thisNode.getPredecessor() == otherNode) {
-                                                               // other node 
is child, remove it
-                                                               iter.remove();
-                                                       } else if 
(otherNode.getPredecessor() == thisNode) {
-                                                               shouldAdd = 
false;
-                                                       }
-                                               } else {
-                                                       throw new 
RuntimeException("Unrecodnized case in test.");
-                                               }
-                                       }
-                               }
-                               
-                               if (shouldAdd) {
-                                       list.add(n);
-                               }
-                       }
-                       
-                       this.map = map;
-               }
-               
-               
-               @SuppressWarnings("unchecked")
-               public <T extends PlanNode> T getNode(String name) {
-                       List<PlanNode> nodes = this.map.get(name);
-                       if (nodes == null || nodes.isEmpty()) {
-                               throw new RuntimeException("No node found with 
the given name.");
-                       } else if (nodes.size() != 1) {
-                               throw new RuntimeException("Multiple nodes 
found with the given name.");
-                       } else {
-                               return (T) nodes.get(0);
-                       }
-               }
-               
-               @SuppressWarnings("unchecked")
-               public <T extends PlanNode> T getNode(String name, Class<? 
extends Function> stubClass) {
-                       List<PlanNode> nodes = this.map.get(name);
-                       if (nodes == null || nodes.isEmpty()) {
-                               throw new RuntimeException("No node found with 
the given name and stub class.");
-                       } else {
-                               PlanNode found = null;
-                               for (PlanNode node : nodes) {
-                                       if (node.getClass() == stubClass) {
-                                               if (found == null) {
-                                                       found = node;
-                                               } else {
-                                                       throw new 
RuntimeException("Multiple nodes found with the given name and stub class.");
-                                               }
-                                       }
-                               }
-                               if (found == null) {
-                                       throw new RuntimeException("No node 
found with the given name and stub class.");
-                               } else {
-                                       return (T) found;
-                               }
-                       }
-               }
-               
-               public List<PlanNode> getNodes(String name) {
-                       List<PlanNode> nodes = this.map.get(name);
-                       if (nodes == null || nodes.isEmpty()) {
-                               throw new RuntimeException("No node found with 
the given name.");
-                       } else {
-                               return new ArrayList<PlanNode>(nodes);
-                       }
-               }
-       }
-
-       /**
-        * Collects all DataSources of a plan to add statistics
-        *
-        */
-       public static class SourceCollectorVisitor implements 
Visitor<Operator<?>> {
-               
-               protected final List<GenericDataSourceBase<?, ?>> sources = new 
ArrayList<GenericDataSourceBase<?, ?>>(4);
-
-               @Override
-               public boolean preVisit(Operator<?> visitable) {
-                       
-                       if(visitable instanceof GenericDataSourceBase) {
-                               sources.add((GenericDataSourceBase<?, ?>) 
visitable);
-                       }
-                       else if(visitable instanceof BulkIterationBase) {
-                               ((BulkIterationBase<?>) 
visitable).getNextPartialSolution().accept(this);
-                       }
-                       
-                       return true;
-               }
-
-               @Override
-               public void postVisit(Operator<?> visitable) {}
-               
-               public List<GenericDataSourceBase<?, ?>> getSources() {
-                       return this.sources;
-               }
-               
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
deleted file mode 100644
index b17e777..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.Visitor;
-import org.junit.Test;
-
-/**
- * Tests in this class:
- * <ul>
- *   <li>Tests that check the correct handling of the properties and 
strategies in the case where the degree of
- *       parallelism between tasks is increased or decreased.
- * </ul>
- */
-@SuppressWarnings({"serial", "deprecation"})
-public class DOPChangeTest extends CompilerTestBase {
-       
-       /**
-        * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve 
all fields (hence all properties).
-        * 
-        * Increases DOP between 1st reduce and 2nd map, so the hash 
partitioning from 1st reduce is not reusable.
-        * Expected to re-establish partitioning between reduce and map, via 
hash, because random is a full network
-        * transit as well.
-        */
-       @Test
-       public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
-               final int degOfPar = DEFAULT_PARALLELISM;
-               
-               // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               source.setDegreeOfParallelism(degOfPar);
-               
-               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
-               map1.setDegreeOfParallelism(degOfPar);
-               map1.setInput(source);
-               
-               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-               reduce1.setDegreeOfParallelism(degOfPar);
-               reduce1.setInput(map1);
-               
-               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
-               map2.setDegreeOfParallelism(degOfPar * 2);
-               map2.setInput(reduce1);
-               
-               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-               reduce2.setDegreeOfParallelism(degOfPar * 2);
-               reduce2.setInput(map2);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
-               sink.setDegreeOfParallelism(degOfPar * 2);
-               sink.setInput(reduce2);
-               
-               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
-               
-               // submit the plan to the compiler
-               OptimizedPlan oPlan = compileNoStats(plan);
-               
-               // check the optimized Plan
-               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
-               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
-               // mapper respectively reducer
-               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
-               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
-               
-               ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-               ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
-               
-               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.PARTITION_HASH, mapIn);
-               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, redIn);
-       }
-       
-       /**
-        * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve 
all fields (hence all properties).
-        * 
-        * Increases DOP between 2nd map and 2nd reduce, so the hash 
partitioning from 1st reduce is not reusable.
-        * Expected to re-establish partitioning between map and reduce (hash).
-        */
-       @Test
-       public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
-               final int degOfPar = DEFAULT_PARALLELISM;
-               
-               // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               source.setDegreeOfParallelism(degOfPar);
-               
-               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
-               map1.setDegreeOfParallelism(degOfPar);
-               map1.setInput(source);
-               
-               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-               reduce1.setDegreeOfParallelism(degOfPar);
-               reduce1.setInput(map1);
-               
-               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
-               map2.setDegreeOfParallelism(degOfPar);
-               map2.setInput(reduce1);
-               
-               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-               reduce2.setDegreeOfParallelism(degOfPar * 2);
-               reduce2.setInput(map2);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
-               sink.setDegreeOfParallelism(degOfPar * 2);
-               sink.setInput(reduce2);
-               
-               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
-               
-               // submit the plan to the compiler
-               OptimizedPlan oPlan = compileNoStats(plan);
-               
-               // check the optimized Plan
-               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
-               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
-               // mapper respectively reducer
-               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
-               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
-               
-               ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-               ShipStrategyType reduceIn = 
red2Node.getInput().getShipStrategy();
-               
-               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, mapIn);
-               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.PARTITION_HASH, reduceIn);
-       }
-       
-       /**
-        * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve 
all fields (hence all properties).
-        * 
-        * Increases DOP between 1st reduce and 2nd map, such that more tasks 
are on one instance.
-        * Expected to re-establish partitioning between map and reduce via a 
local hash.
-        */
-       @Test
-       public void checkPropertyHandlingWithIncreasingLocalParallelism() {
-               final int degOfPar = 2 * DEFAULT_PARALLELISM;
-               
-               // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               source.setDegreeOfParallelism(degOfPar);
-               
-               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
-               map1.setDegreeOfParallelism(degOfPar);
-               map1.setInput(source);
-               
-               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-               reduce1.setDegreeOfParallelism(degOfPar);
-               reduce1.setInput(map1);
-               
-               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
-               map2.setDegreeOfParallelism(degOfPar * 2);
-               map2.setInput(reduce1);
-               
-               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-               reduce2.setDegreeOfParallelism(degOfPar * 2);
-               reduce2.setInput(map2);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
-               sink.setDegreeOfParallelism(degOfPar * 2);
-               sink.setInput(reduce2);
-               
-               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
-               
-               // submit the plan to the compiler
-               OptimizedPlan oPlan = compileNoStats(plan);
-               
-               // check the optimized Plan
-               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
-               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
-               // mapper respectively reducer
-               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
-               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
-               
-               ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
-               ShipStrategyType reduceIn = 
red2Node.getInput().getShipStrategy();
-               
-               Assert.assertTrue("Invalid ship strategy for an operator.", 
-                               (ShipStrategyType.PARTITION_RANDOM ==  mapIn && 
ShipStrategyType.PARTITION_HASH == reduceIn) || 
-                               (ShipStrategyType.PARTITION_HASH == mapIn && 
ShipStrategyType.FORWARD == reduceIn));
-       }
-       
-       
-       
-       @Test
-       public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
-               final int degOfPar = DEFAULT_PARALLELISM;
-               
-               // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               source.setDegreeOfParallelism(degOfPar * 2);
-               
-               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
-               map1.setDegreeOfParallelism(degOfPar * 2);
-               map1.setInput(source);
-               
-               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
-               reduce1.setDegreeOfParallelism(degOfPar * 2);
-               reduce1.setInput(map1);
-               
-               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
-               map2.setDegreeOfParallelism(degOfPar);
-               map2.setInput(reduce1);
-               
-               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
-               reduce2.setDegreeOfParallelism(degOfPar);
-               reduce2.setInput(map2);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
-               sink.setDegreeOfParallelism(degOfPar);
-               sink.setInput(reduce2);
-               
-               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
-               
-               // submit the plan to the compiler
-               OptimizedPlan oPlan = compileNoStats(plan);
-
-               // check the optimized Plan
-               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
-               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
-               // mapper respectively reducer
-               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
-               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
-               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
-
-               Assert.assertTrue("The no sorting local strategy.",
-                               LocalStrategy.SORT == 
red2Node.getInput().getLocalStrategy() ||
-                                               LocalStrategy.SORT == 
map2Node.getInput().getLocalStrategy());
-
-               Assert.assertTrue("The no partitioning ship strategy.",
-                               ShipStrategyType.PARTITION_HASH == 
red2Node.getInput().getShipStrategy() ||
-                                               ShipStrategyType.PARTITION_HASH 
== map2Node.getInput().getShipStrategy());
-       }
-
-       /**
-        * Checks that re-partitioning happens when the inputs of a two-input 
contract have different DOPs.
-        * 
-        * Test Plan:
-        * <pre>
-        * 
-        * (source) -> reduce -\
-        *                      Match -> (sink)
-        * (source) -> reduce -/
-        * 
-        * </pre>
-        * 
-        */
-       @Test
-       public void checkPropertyHandlingWithTwoInputs() {
-               // construct the plan
-
-               FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-               FileDataSource sourceB = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-               
-               ReduceOperator redA = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                       .input(sourceA)
-                       .build();
-               ReduceOperator redB = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                       .input(sourceB)
-                       .build();
-               
-               JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), 
IntValue.class, 0, 0)
-                       .input1(redA)
-                       .input2(redB)
-                       .build();
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, mat);
-               
-               sourceA.setDegreeOfParallelism(5);
-               sourceB.setDegreeOfParallelism(7);
-               redA.setDegreeOfParallelism(5);
-               redB.setDegreeOfParallelism(7);
-               
-               mat.setDegreeOfParallelism(5);
-               
-               sink.setDegreeOfParallelism(5);
-               
-               
-               // return the PACT plan
-               Plan plan = new Plan(sink, "Partition on DoP Change");
-               
-               OptimizedPlan oPlan = compileNoStats(plan);
-               
-               JobGraphGenerator jobGen = new JobGraphGenerator();
-               
-               //Compile plan to verify that no error is thrown
-               jobGen.compileJobGraph(oPlan);
-               
-               oPlan.accept(new Visitor<PlanNode>() {
-                       
-                       @Override
-                       public boolean preVisit(PlanNode visitable) {
-                               if (visitable instanceof DualInputPlanNode) {
-                                       DualInputPlanNode node = 
(DualInputPlanNode) visitable;
-                                       Channel c1 = node.getInput1();
-                                       Channel c2 = node.getInput2();
-                                       
-                                       Assert.assertEquals("Incompatible 
shipping strategy chosen for match", ShipStrategyType.FORWARD, 
c1.getShipStrategy());
-                                       Assert.assertEquals("Incompatible 
shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, 
c2.getShipStrategy());
-                                       return false;
-                               }
-                               return true;
-                       }
-                       
-                       @Override
-                       public void postVisit(PlanNode visitable) {
-                               // DO NOTHING
-                       }
-               });
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
deleted file mode 100644
index aaee975..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class DisjointDataFlowsTest extends CompilerTestBase {
-
-       @Test
-       public void testDisjointFlows() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       // generate two different flows
-                       env.generateSequence(1, 10).print();
-                       env.generateSequence(1, 10).print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
deleted file mode 100644
index 34aa9f8..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.DistinctOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-
-@SuppressWarnings("serial")
-public class DistinctCompilationTest extends CompilerTestBase implements 
java.io.Serializable {
-
-       @Test
-       public void testDistinctPlain() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-
-                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-                                       .name("source").setParallelism(6);
-
-                       data
-                                       .distinct().name("reducer")
-                                       .print().name("sink");
-
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
-
-                       // get the original nodes
-                       SourcePlanNode sourceNode = resolver.getNode("source");
-                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
-                       SinkPlanNode sinkNode = resolver.getNode("sink");
-
-                       // get the combiner
-                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
-
-                       // check wiring
-                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
-                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
-
-                       // check that both reduce and combiner have the same 
strategy
-                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
-                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
-
-                       // check the keys
-                       assertEquals(new FieldList(0, 1), 
reduceNode.getKeys(0));
-                       assertEquals(new FieldList(0, 1), 
combineNode.getKeys(0));
-                       assertEquals(new FieldList(0, 1), 
reduceNode.getInput().getLocalStrategyKeys());
-
-                       // check DOP
-                       assertEquals(6, sourceNode.getParallelism());
-                       assertEquals(6, combineNode.getParallelism());
-                       assertEquals(8, reduceNode.getParallelism());
-                       assertEquals(8, sinkNode.getParallelism());
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
-               }
-       }
-
-       @Test
-       public void testDistinctWithSelectorFunctionKey() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-
-                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-                                       .name("source").setParallelism(6);
-
-                       data
-                                       .distinct(new 
KeySelector<Tuple2<String,Double>, String>() {
-                                               public String 
getKey(Tuple2<String, Double> value) { return value.f0; }
-                                       }).name("reducer")
-                                       .print().name("sink");
-
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
-
-                       // get the original nodes
-                       SourcePlanNode sourceNode = resolver.getNode("source");
-                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
-                       SinkPlanNode sinkNode = resolver.getNode("sink");
-
-                       // get the combiner
-                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
-
-                       // get the key extractors and projectors
-                       SingleInputPlanNode keyExtractor = 
(SingleInputPlanNode) combineNode.getInput().getSource();
-                       SingleInputPlanNode keyProjector = 
(SingleInputPlanNode) sinkNode.getInput().getSource();
-
-                       // check wiring
-                       assertEquals(sourceNode, 
keyExtractor.getInput().getSource());
-                       assertEquals(keyProjector, 
sinkNode.getInput().getSource());
-
-                       // check that both reduce and combiner have the same 
strategy
-                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
-                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
-
-                       // check the keys
-                       assertEquals(new FieldList(0), reduceNode.getKeys(0));
-                       assertEquals(new FieldList(0), combineNode.getKeys(0));
-                       assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
-
-                       // check DOP
-                       assertEquals(6, sourceNode.getParallelism());
-                       assertEquals(6, keyExtractor.getParallelism());
-                       assertEquals(6, combineNode.getParallelism());
-
-                       assertEquals(8, reduceNode.getParallelism());
-                       assertEquals(8, keyProjector.getParallelism());
-                       assertEquals(8, sinkNode.getParallelism());
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
-               }
-       }
-
-       @Test
-       public void testDistinctWithFieldPositionKeyCombinable() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(8);
-
-                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
-                                       .name("source").setParallelism(6);
-
-                       DistinctOperator<Tuple2<String, Double>> reduced = data
-                                       .distinct(1).name("reducer");
-
-                       reduced.print().name("sink");
-
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-
-                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
-
-                       // get the original nodes
-                       SourcePlanNode sourceNode = resolver.getNode("source");
-                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
-                       SinkPlanNode sinkNode = resolver.getNode("sink");
-
-                       // get the combiner
-                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
-
-                       // check wiring
-                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
-                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
-
-                       // check that both reduce and combiner have the same 
strategy
-                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
-                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
-
-                       // check the keys
-                       assertEquals(new FieldList(1), reduceNode.getKeys(0));
-                       assertEquals(new FieldList(1), combineNode.getKeys(0));
-                       assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
-
-                       // check DOP
-                       assertEquals(6, sourceNode.getParallelism());
-                       assertEquals(6, combineNode.getParallelism());
-                       assertEquals(8, reduceNode.getParallelism());
-                       assertEquals(8, sinkNode.getParallelism());
-               }
-               catch (Exception e) {
-                       System.err.println(e.getMessage());
-                       e.printStackTrace();
-                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
-               }
-       }
-}
\ No newline at end of file

Reply via email to