http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
new file mode 100644
index 0000000..565d992
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Visitor;
+import org.junit.Before;
+
+/**
+ * Base class for Optimizer tests. Offers utility methods to trigger 
optimization
+ * of a program and to fetch the nodes in an optimizer plan that correspond
+ * the the node in the program plan.
+ */
+public abstract class CompilerTestBase implements java.io.Serializable {
+       
+       private static final long serialVersionUID = 1L;
+
+       protected static final String IN_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/random";
+       
+       protected static final String OUT_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/null";
+       
+       protected static final int DEFAULT_PARALLELISM = 8;
+       
+       private static final String CACHE_KEY = "cachekey";
+       
+       // 
------------------------------------------------------------------------
+       
+       protected transient DataStatistics dataStats;
+       
+       protected transient Optimizer withStatsCompiler;
+       
+       protected transient Optimizer noStatsCompiler;
+       
+       private transient int statCounter;
+       
+       // 
------------------------------------------------------------------------     
+       
+       @Before
+       public void setup() {
+               this.dataStats = new DataStatistics();
+               this.withStatsCompiler = new Optimizer(this.dataStats, new 
DefaultCostEstimator());
+               
this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+               
+               this.noStatsCompiler = new Optimizer(null, new 
DefaultCostEstimator());
+               
this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       public OptimizedPlan compileWithStats(Plan p) {
+               return this.withStatsCompiler.compile(p);
+       }
+       
+       public OptimizedPlan compileNoStats(Plan p) {
+               return this.noStatsCompiler.compile(p);
+       }
+       
+       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
long size, float recordWidth) {
+               setSourceStatistics(source, new 
FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
+       }
+       
+       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
FileBaseStatistics stats) {
+               final String key = CACHE_KEY + this.statCounter++;
+               this.dataStats.cacheBaseStatistics(stats, key);
+               source.setStatisticsKey(key);
+       }
+       
+       // 
------------------------------------------------------------------------
+       
+       public static OptimizerPlanNodeResolver 
getOptimizerPlanNodeResolver(OptimizedPlan plan) {
+               return new OptimizerPlanNodeResolver(plan);
+       }
+       
+       public static final class OptimizerPlanNodeResolver {
+               
+               private final Map<String, ArrayList<PlanNode>> map;
+               
+               OptimizerPlanNodeResolver(OptimizedPlan p) {
+                       HashMap<String, ArrayList<PlanNode>> map = new 
HashMap<String, ArrayList<PlanNode>>();
+                       
+                       for (PlanNode n : p.getAllNodes()) {
+                               Operator<?> c = 
n.getOriginalOptimizerNode().getOperator();
+                               String name = c.getName();
+                               
+                               ArrayList<PlanNode> list = map.get(name);
+                               if (list == null) {
+                                       list = new ArrayList<PlanNode>(2);
+                                       map.put(name, list);
+                               }
+                               
+                               // check whether this node is a child of a node 
with the same contract (aka combiner)
+                               boolean shouldAdd = true;
+                               for (Iterator<PlanNode> iter = list.iterator(); 
iter.hasNext();) {
+                                       PlanNode in = iter.next();
+                                       if 
(in.getOriginalOptimizerNode().getOperator() == c) {
+                                               // is this the child or is our 
node the child
+                                               if (in instanceof 
SingleInputPlanNode && n instanceof SingleInputPlanNode) {
+                                                       SingleInputPlanNode 
thisNode = (SingleInputPlanNode) n;
+                                                       SingleInputPlanNode 
otherNode = (SingleInputPlanNode) in;
+                                                       
+                                                       if 
(thisNode.getPredecessor() == otherNode) {
+                                                               // other node 
is child, remove it
+                                                               iter.remove();
+                                                       } else if 
(otherNode.getPredecessor() == thisNode) {
+                                                               shouldAdd = 
false;
+                                                       }
+                                               } else {
+                                                       throw new 
RuntimeException("Unrecodnized case in test.");
+                                               }
+                                       }
+                               }
+                               
+                               if (shouldAdd) {
+                                       list.add(n);
+                               }
+                       }
+                       
+                       this.map = map;
+               }
+               
+               
+               @SuppressWarnings("unchecked")
+               public <T extends PlanNode> T getNode(String name) {
+                       List<PlanNode> nodes = this.map.get(name);
+                       if (nodes == null || nodes.isEmpty()) {
+                               throw new RuntimeException("No node found with 
the given name.");
+                       } else if (nodes.size() != 1) {
+                               throw new RuntimeException("Multiple nodes 
found with the given name.");
+                       } else {
+                               return (T) nodes.get(0);
+                       }
+               }
+               
+               @SuppressWarnings("unchecked")
+               public <T extends PlanNode> T getNode(String name, Class<? 
extends Function> stubClass) {
+                       List<PlanNode> nodes = this.map.get(name);
+                       if (nodes == null || nodes.isEmpty()) {
+                               throw new RuntimeException("No node found with 
the given name and stub class.");
+                       } else {
+                               PlanNode found = null;
+                               for (PlanNode node : nodes) {
+                                       if (node.getClass() == stubClass) {
+                                               if (found == null) {
+                                                       found = node;
+                                               } else {
+                                                       throw new 
RuntimeException("Multiple nodes found with the given name and stub class.");
+                                               }
+                                       }
+                               }
+                               if (found == null) {
+                                       throw new RuntimeException("No node 
found with the given name and stub class.");
+                               } else {
+                                       return (T) found;
+                               }
+                       }
+               }
+               
+               public List<PlanNode> getNodes(String name) {
+                       List<PlanNode> nodes = this.map.get(name);
+                       if (nodes == null || nodes.isEmpty()) {
+                               throw new RuntimeException("No node found with 
the given name.");
+                       } else {
+                               return new ArrayList<PlanNode>(nodes);
+                       }
+               }
+       }
+
+       /**
+        * Collects all DataSources of a plan to add statistics
+        *
+        */
+       public static class SourceCollectorVisitor implements 
Visitor<Operator<?>> {
+               
+               protected final List<GenericDataSourceBase<?, ?>> sources = new 
ArrayList<GenericDataSourceBase<?, ?>>(4);
+
+               @Override
+               public boolean preVisit(Operator<?> visitable) {
+                       
+                       if(visitable instanceof GenericDataSourceBase) {
+                               sources.add((GenericDataSourceBase<?, ?>) 
visitable);
+                       }
+                       else if(visitable instanceof BulkIterationBase) {
+                               ((BulkIterationBase<?>) 
visitable).getNextPartialSolution().accept(this);
+                       }
+                       
+                       return true;
+               }
+
+               @Override
+               public void postVisit(Operator<?> visitable) {}
+               
+               public List<GenericDataSourceBase<?, ?>> getSources() {
+                       return this.sources;
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
new file mode 100644
index 0000000..b17e777
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DOPChangeTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.junit.Assert;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Visitor;
+import org.junit.Test;
+
+/**
+ * Tests in this class:
+ * <ul>
+ *   <li>Tests that check the correct handling of the properties and 
strategies in the case where the degree of
+ *       parallelism between tasks is increased or decreased.
+ * </ul>
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class DOPChangeTest extends CompilerTestBase {
+       
+       /**
+        * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve 
all fields (hence all properties).
+        * 
+        * Increases DOP between 1st reduce and 2nd map, so the hash 
partitioning from 1st reduce is not reusable.
+        * Expected to re-establish partitioning between reduce and map, via 
hash, because random is a full network
+        * transit as well.
+        */
+       @Test
+       public void checkPropertyHandlingWithIncreasingGlobalParallelism1() {
+               final int degOfPar = DEFAULT_PARALLELISM;
+               
+               // construct the plan
+               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
+               source.setDegreeOfParallelism(degOfPar);
+               
+               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
+               map1.setDegreeOfParallelism(degOfPar);
+               map1.setInput(source);
+               
+               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+               reduce1.setDegreeOfParallelism(degOfPar);
+               reduce1.setInput(map1);
+               
+               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
+               map2.setDegreeOfParallelism(degOfPar * 2);
+               map2.setInput(reduce1);
+               
+               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+               reduce2.setDegreeOfParallelism(degOfPar * 2);
+               reduce2.setInput(map2);
+               
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
+               sink.setDegreeOfParallelism(degOfPar * 2);
+               sink.setInput(reduce2);
+               
+               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
+               
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+               
+               // check the optimized Plan
+               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
+               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
+               // mapper respectively reducer
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
+               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
+               
+               ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+               ShipStrategyType redIn = red2Node.getInput().getShipStrategy();
+               
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.PARTITION_HASH, mapIn);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, redIn);
+       }
+       
+       /**
+        * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve 
all fields (hence all properties).
+        * 
+        * Increases DOP between 2nd map and 2nd reduce, so the hash 
partitioning from 1st reduce is not reusable.
+        * Expected to re-establish partitioning between map and reduce (hash).
+        */
+       @Test
+       public void checkPropertyHandlingWithIncreasingGlobalParallelism2() {
+               final int degOfPar = DEFAULT_PARALLELISM;
+               
+               // construct the plan
+               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
+               source.setDegreeOfParallelism(degOfPar);
+               
+               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
+               map1.setDegreeOfParallelism(degOfPar);
+               map1.setInput(source);
+               
+               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+               reduce1.setDegreeOfParallelism(degOfPar);
+               reduce1.setInput(map1);
+               
+               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
+               map2.setDegreeOfParallelism(degOfPar);
+               map2.setInput(reduce1);
+               
+               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+               reduce2.setDegreeOfParallelism(degOfPar * 2);
+               reduce2.setInput(map2);
+               
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
+               sink.setDegreeOfParallelism(degOfPar * 2);
+               sink.setInput(reduce2);
+               
+               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
+               
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+               
+               // check the optimized Plan
+               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
+               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
+               // mapper respectively reducer
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
+               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
+               
+               ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+               ShipStrategyType reduceIn = 
red2Node.getInput().getShipStrategy();
+               
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, mapIn);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.PARTITION_HASH, reduceIn);
+       }
+       
+       /**
+        * Simple Job: Map -> Reduce -> Map -> Reduce. All functions preserve 
all fields (hence all properties).
+        * 
+        * Increases DOP between 1st reduce and 2nd map, such that more tasks 
are on one instance.
+        * Expected to re-establish partitioning between map and reduce via a 
local hash.
+        */
+       @Test
+       public void checkPropertyHandlingWithIncreasingLocalParallelism() {
+               final int degOfPar = 2 * DEFAULT_PARALLELISM;
+               
+               // construct the plan
+               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
+               source.setDegreeOfParallelism(degOfPar);
+               
+               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
+               map1.setDegreeOfParallelism(degOfPar);
+               map1.setInput(source);
+               
+               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+               reduce1.setDegreeOfParallelism(degOfPar);
+               reduce1.setInput(map1);
+               
+               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
+               map2.setDegreeOfParallelism(degOfPar * 2);
+               map2.setInput(reduce1);
+               
+               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+               reduce2.setDegreeOfParallelism(degOfPar * 2);
+               reduce2.setInput(map2);
+               
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
+               sink.setDegreeOfParallelism(degOfPar * 2);
+               sink.setInput(reduce2);
+               
+               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
+               
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+               
+               // check the optimized Plan
+               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
+               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
+               // mapper respectively reducer
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
+               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
+               
+               ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
+               ShipStrategyType reduceIn = 
red2Node.getInput().getShipStrategy();
+               
+               Assert.assertTrue("Invalid ship strategy for an operator.", 
+                               (ShipStrategyType.PARTITION_RANDOM ==  mapIn && 
ShipStrategyType.PARTITION_HASH == reduceIn) || 
+                               (ShipStrategyType.PARTITION_HASH == mapIn && 
ShipStrategyType.FORWARD == reduceIn));
+       }
+       
+       
+       
+       @Test
+       public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
+               final int degOfPar = DEFAULT_PARALLELISM;
+               
+               // construct the plan
+               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
+               source.setDegreeOfParallelism(degOfPar * 2);
+               
+               MapOperator map1 = MapOperator.builder(new 
IdentityMap()).name("Map1").build();
+               map1.setDegreeOfParallelism(degOfPar * 2);
+               map1.setInput(source);
+               
+               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 1").build();
+               reduce1.setDegreeOfParallelism(degOfPar * 2);
+               reduce1.setInput(map1);
+               
+               MapOperator map2 = MapOperator.builder(new 
IdentityMap()).name("Map2").build();
+               map2.setDegreeOfParallelism(degOfPar);
+               map2.setInput(reduce1);
+               
+               ReduceOperator reduce2 = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).name("Reduce 2").build();
+               reduce2.setDegreeOfParallelism(degOfPar);
+               reduce2.setInput(map2);
+               
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
+               sink.setDegreeOfParallelism(degOfPar);
+               sink.setInput(reduce2);
+               
+               Plan plan = new Plan(sink, "Test Increasing Degree Of 
Parallelism");
+               
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when reducer 1 distributes its data across the instances of 
map2, it needs to employ a local hash method,
+               // because map2 has twice as many instances and key/value pairs 
with the same key need to be processed by the same
+               // mapper respectively reducer
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SingleInputPlanNode red2Node = (SingleInputPlanNode) 
sinkNode.getPredecessor();
+               SingleInputPlanNode map2Node = (SingleInputPlanNode) 
red2Node.getPredecessor();
+
+               Assert.assertTrue("The no sorting local strategy.",
+                               LocalStrategy.SORT == 
red2Node.getInput().getLocalStrategy() ||
+                                               LocalStrategy.SORT == 
map2Node.getInput().getLocalStrategy());
+
+               Assert.assertTrue("The no partitioning ship strategy.",
+                               ShipStrategyType.PARTITION_HASH == 
red2Node.getInput().getShipStrategy() ||
+                                               ShipStrategyType.PARTITION_HASH 
== map2Node.getInput().getShipStrategy());
+       }
+
+       /**
+        * Checks that re-partitioning happens when the inputs of a two-input 
contract have different DOPs.
+        * 
+        * Test Plan:
+        * <pre>
+        * 
+        * (source) -> reduce -\
+        *                      Match -> (sink)
+        * (source) -> reduce -/
+        * 
+        * </pre>
+        * 
+        */
+       @Test
+       public void checkPropertyHandlingWithTwoInputs() {
+               // construct the plan
+
+               FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
+               FileDataSource sourceB = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
+               
+               ReduceOperator redA = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
+                       .input(sourceA)
+                       .build();
+               ReduceOperator redB = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
+                       .input(sourceB)
+                       .build();
+               
+               JoinOperator mat = JoinOperator.builder(new DummyMatchStub(), 
IntValue.class, 0, 0)
+                       .input1(redA)
+                       .input2(redB)
+                       .build();
+               
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, mat);
+               
+               sourceA.setDegreeOfParallelism(5);
+               sourceB.setDegreeOfParallelism(7);
+               redA.setDegreeOfParallelism(5);
+               redB.setDegreeOfParallelism(7);
+               
+               mat.setDegreeOfParallelism(5);
+               
+               sink.setDegreeOfParallelism(5);
+               
+               
+               // return the PACT plan
+               Plan plan = new Plan(sink, "Partition on DoP Change");
+               
+               OptimizedPlan oPlan = compileNoStats(plan);
+               
+               JobGraphGenerator jobGen = new JobGraphGenerator();
+               
+               //Compile plan to verify that no error is thrown
+               jobGen.compileJobGraph(oPlan);
+               
+               oPlan.accept(new Visitor<PlanNode>() {
+                       
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               if (visitable instanceof DualInputPlanNode) {
+                                       DualInputPlanNode node = 
(DualInputPlanNode) visitable;
+                                       Channel c1 = node.getInput1();
+                                       Channel c2 = node.getInput2();
+                                       
+                                       Assert.assertEquals("Incompatible 
shipping strategy chosen for match", ShipStrategyType.FORWARD, 
c1.getShipStrategy());
+                                       Assert.assertEquals("Incompatible 
shipping strategy chosen for match", ShipStrategyType.PARTITION_HASH, 
c2.getShipStrategy());
+                                       return false;
+                               }
+                               return true;
+                       }
+                       
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+                               // DO NOTHING
+                       }
+               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
new file mode 100644
index 0000000..aaee975
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DisjointDataFlowsTest extends CompilerTestBase {
+
+       @Test
+       public void testDisjointFlows() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       // generate two different flows
+                       env.generateSequence(1, 10).print();
+                       env.generateSequence(1, 10).print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
new file mode 100644
index 0000000..34aa9f8
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public class DistinctCompilationTest extends CompilerTestBase implements 
java.io.Serializable {
+
+       @Test
+       public void testDistinctPlain() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                                       .name("source").setParallelism(6);
+
+                       data
+                                       .distinct().name("reducer")
+                                       .print().name("sink");
+
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+
+                       // check wiring
+                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
+
+                       // check the keys
+                       assertEquals(new FieldList(0, 1), 
reduceNode.getKeys(0));
+                       assertEquals(new FieldList(0, 1), 
combineNode.getKeys(0));
+                       assertEquals(new FieldList(0, 1), 
reduceNode.getInput().getLocalStrategyKeys());
+
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testDistinctWithSelectorFunctionKey() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                                       .name("source").setParallelism(6);
+
+                       data
+                                       .distinct(new 
KeySelector<Tuple2<String,Double>, String>() {
+                                               public String 
getKey(Tuple2<String, Double> value) { return value.f0; }
+                                       }).name("reducer")
+                                       .print().name("sink");
+
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+
+                       // get the key extractors and projectors
+                       SingleInputPlanNode keyExtractor = 
(SingleInputPlanNode) combineNode.getInput().getSource();
+                       SingleInputPlanNode keyProjector = 
(SingleInputPlanNode) sinkNode.getInput().getSource();
+
+                       // check wiring
+                       assertEquals(sourceNode, 
keyExtractor.getInput().getSource());
+                       assertEquals(keyProjector, 
sinkNode.getInput().getSource());
+
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
+
+                       // check the keys
+                       assertEquals(new FieldList(0), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(0), combineNode.getKeys(0));
+                       assertEquals(new FieldList(0), 
reduceNode.getInput().getLocalStrategyKeys());
+
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, keyExtractor.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, keyProjector.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testDistinctWithFieldPositionKeyCombinable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(8);
+
+                       DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+                                       .name("source").setParallelism(6);
+
+                       DistinctOperator<Tuple2<String, Double>> reduced = data
+                                       .distinct(1).name("reducer");
+
+                       reduced.print().name("sink");
+
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+
+                       OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(op);
+
+                       // get the original nodes
+                       SourcePlanNode sourceNode = resolver.getNode("source");
+                       SingleInputPlanNode reduceNode = 
resolver.getNode("reducer");
+                       SinkPlanNode sinkNode = resolver.getNode("sink");
+
+                       // get the combiner
+                       SingleInputPlanNode combineNode = (SingleInputPlanNode) 
reduceNode.getInput().getSource();
+
+                       // check wiring
+                       assertEquals(sourceNode, 
combineNode.getInput().getSource());
+                       assertEquals(reduceNode, 
sinkNode.getInput().getSource());
+
+                       // check that both reduce and combiner have the same 
strategy
+                       assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reduceNode.getDriverStrategy());
+                       assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combineNode.getDriverStrategy());
+
+                       // check the keys
+                       assertEquals(new FieldList(1), reduceNode.getKeys(0));
+                       assertEquals(new FieldList(1), combineNode.getKeys(0));
+                       assertEquals(new FieldList(1), 
reduceNode.getInput().getLocalStrategyKeys());
+
+                       // check DOP
+                       assertEquals(6, sourceNode.getParallelism());
+                       assertEquals(6, combineNode.getParallelism());
+                       assertEquals(8, reduceNode.getParallelism());
+                       assertEquals(8, sinkNode.getParallelism());
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
+               }
+       }
+}
\ No newline at end of file

Reply via email to