http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
new file mode 100644
index 0000000..f6885c5
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.junit.Test;
+
+/**
+ * This test case has been created to validate a bug that occurred when
+ * the ReduceOperator was used without a grouping key.
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class ReduceAllTest extends CompilerTestBase {
+
+       @Test
+       public void testReduce() {
+               // construct the plan
+               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
+               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce()).name("Reduce1").input(source).build();
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
+               sink.setInput(reduce1);
+               Plan plan = new Plan(sink, "AllReduce Test");
+               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
+               
+               
+               try {
+                       OptimizedPlan oPlan = compileNoStats(plan);
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
+                       jobGen.compileJobGraph(oPlan);
+               } catch(CompilerException ce) {
+                       ce.printStackTrace();
+                       fail("The pact compiler is unable to compile this plan 
correctly");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
new file mode 100644
index 0000000..da44b59
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReplicatingDataSourceTest extends CompilerTestBase {
+
+       /**
+        * Tests join program with replicated data source.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInput() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind map.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindMap() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .map(new IdMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind filter.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindFilter() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .filter(new NoFilter())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind flatMap.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .flatMap(new IdFlatMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind map partition.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .mapPartition(new IdPMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests join program with replicated data source behind multiple map 
ops.
+        */
+       @Test
+       public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .filter(new NoFilter())
+                               .mapPartition(new IdPMap())
+                               .flatMap(new IdFlatMap())
+                               .map(new IdMap())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when join should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode joinNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType joinIn1 = 
joinNode.getInput1().getShipStrategy();
+               ShipStrategyType joinIn2 = 
joinNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, joinIn2);
+       }
+
+       /**
+        * Tests cross program with replicated data source.
+        */
+       @Test
+       public void checkCrossWithReplicatedSourceInput() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .cross(source2)
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when cross should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode crossNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType crossIn1 = 
crossNode.getInput1().getShipStrategy();
+               ShipStrategyType crossIn2 = 
crossNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn2);
+       }
+
+       /**
+        * Tests cross program with replicated data source behind map and 
filter.
+        */
+       @Test
+       public void checkCrossWithReplicatedSourceInputBehindMap() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .map(new IdMap())
+                               .filter(new NoFilter())
+                               .cross(source2)
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               // when cross should have forward strategy on both sides
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode crossNode = (DualInputPlanNode) 
sinkNode.getPredecessor();
+
+               ShipStrategyType crossIn1 = 
crossNode.getInput1().getShipStrategy();
+               ShipStrategyType crossIn2 = 
crossNode.getInput2().getShipStrategy();
+
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn1);
+               Assert.assertEquals("Invalid ship strategy for an operator.", 
ShipStrategyType.FORWARD, crossIn2);
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source and 
changing DOP.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputChangingDOP() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               
.join(source2).where("*").equalTo("*").setParallelism(DEFAULT_PARALLELISM+2)
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source 
behind map and changing DOP.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .map(new 
IdMap()).setParallelism(DEFAULT_PARALLELISM+1)
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source 
behind reduce.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputBehindReduce() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .reduce(new LastReduce())
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+       }
+
+       /**
+        * Tests compiler fail for join program with replicated data source 
behind rebalance.
+        */
+       @Test(expected = CompilerException.class)
+       public void checkJoinWithReplicatedSourceInputBehindRebalance() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+                               new ReplicatingInputFormat<Tuple1<String>, 
FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), 
String.class));
+
+               DataSet<Tuple1<String>> source1 = env.createInput(rif, new 
TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+               DataSet<Tuple1<String>> source2 = 
env.readCsvFile("/some/otherpath").types(String.class);
+
+               DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+                               .rebalance()
+                               .join(source2).where("*").equalTo("*")
+                               .writeAsText("/some/newpath");
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+       }
+
+
+       public static class IdMap<T> implements MapFunction<T,T> {
+
+               @Override
+               public T map(T value) throws Exception {
+                       return value;
+               }
+       }
+
+       public static class NoFilter<T> implements FilterFunction<T> {
+
+               @Override
+               public boolean filter(T value) throws Exception {
+                       return false;
+               }
+       }
+
+       public static class IdFlatMap<T> implements FlatMapFunction<T,T> {
+
+               @Override
+               public void flatMap(T value, Collector<T> out) throws Exception 
{
+                       out.collect(value);
+               }
+       }
+
+       public static class IdPMap<T> implements MapPartitionFunction<T,T> {
+
+               @Override
+               public void mapPartition(Iterable<T> values, Collector<T> out) 
throws Exception {
+                       for(T v : values) {
+                               out.collect(v);
+                       }
+               }
+       }
+
+       public static class LastReduce<T> implements ReduceFunction<T> {
+
+               @Override
+               public T reduce(T value1, T value2) throws Exception {
+                       return value2;
+               }
+       }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
new file mode 100644
index 0000000..1fe16bb
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
+
+       @Test
+       public void forwardFieldsTestMapReduce() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               set = set.map(new MockMapper()).withForwardedFields("*")
+                               .groupBy(0)
+                               .reduce(new 
MockReducer()).withForwardedFields("f0->f1")
+                               .map(new MockMapper()).withForwardedFields("*")
+                               .groupBy(1)
+                               .reduce(new 
MockReducer()).withForwardedFields("*");
+
+               set.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               oPlan.accept(new Visitor<PlanNode>() {
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof ReduceOperatorBase) {
+                                       for (Channel input: 
visitable.getInputs()) {
+                                               GlobalProperties gprops = 
visitable.getGlobalProperties();
+                                               LocalProperties lprops = 
visitable.getLocalProperties();
+
+                                               Assert.assertTrue("Reduce 
should just forward the input if it is already partitioned",
+                                                               
input.getShipStrategy() == ShipStrategyType.FORWARD);
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Reducer",
+                                                               
gprops.isPartitionedOnFields(new FieldSet(1)));
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Reducer",
+                                                               
gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED);
+                                               Assert.assertTrue("Wrong 
LocalProperties on Reducer",
+                                                               
lprops.getGroupedFields().contains(1));
+                                       }
+                               }
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof MapOperatorBase) {
+                                       for (Channel input: 
visitable.getInputs()) {
+                                               GlobalProperties gprops = 
visitable.getGlobalProperties();
+                                               LocalProperties lprops = 
visitable.getLocalProperties();
+
+                                               Assert.assertTrue("Map should 
just forward the input if it is already partitioned",
+                                                               
input.getShipStrategy() == ShipStrategyType.FORWARD);
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Mapper",
+                                                               
gprops.isPartitionedOnFields(new FieldSet(1)));
+                                               Assert.assertTrue("Wrong 
GlobalProperties on Mapper",
+                                                               
gprops.getPartitioning() == PartitioningProperty.HASH_PARTITIONED);
+                                               Assert.assertTrue("Wrong 
LocalProperties on Mapper",
+                                                               
lprops.getGroupedFields().contains(1));
+                                       }
+                                       return false;
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+
+                       }
+               });
+       }
+
+       @Test
+       public void forwardFieldsTestJoin() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> in1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> in2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               in1 = in1.map(new MockMapper()).withForwardedFields("*")
+                               .groupBy(0)
+                               .reduce(new 
MockReducer()).withForwardedFields("f0->f1");
+               in2 = in2.map(new MockMapper()).withForwardedFields("*")
+                               .groupBy(1)
+                               .reduce(new 
MockReducer()).withForwardedFields("f1->f2");
+               DataSet<Tuple3<Integer, Integer, Integer>> out = 
in1.join(in2).where(1).equalTo(2).with(new MockJoin());
+
+               out.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               oPlan.accept(new Visitor<PlanNode>() {
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               if (visitable instanceof DualInputPlanNode && 
visitable.getProgramOperator() instanceof JoinOperatorBase) {
+                                       DualInputPlanNode node = 
((DualInputPlanNode) visitable);
+
+                                       final Channel inConn1 = 
node.getInput1();
+                                       final Channel inConn2 = 
node.getInput2();
+
+                                       Assert.assertTrue("Join should just 
forward the input if it is already partitioned",
+                                                       
inConn1.getShipStrategy() == ShipStrategyType.FORWARD);
+                                       Assert.assertTrue("Join should just 
forward the input if it is already partitioned",
+                                                       
inConn2.getShipStrategy() == ShipStrategyType.FORWARD);
+                                       return false;
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+
+                       }
+               });
+       }
+
+       public static class MockMapper implements MapFunction<Tuple3<Integer, 
Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+               @Override
+               public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, 
Integer, Integer> value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class MockReducer implements 
ReduceFunction<Tuple3<Integer, Integer, Integer>> {
+               @Override
+               public Tuple3<Integer, Integer, Integer> reduce(Tuple3<Integer, 
Integer, Integer> value1, Tuple3<Integer, Integer, Integer> value2) throws 
Exception {
+                       return null;
+               }
+       }
+
+       public static class MockJoin implements JoinFunction<Tuple3<Integer, 
Integer, Integer>,
+                       Tuple3<Integer, Integer, Integer>, Tuple3<Integer, 
Integer, Integer>> {
+
+               @Override
+               public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, 
Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws 
Exception {
+                       return null;
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
new file mode 100644
index 0000000..40b54e0
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+@SuppressWarnings("serial")
+public class SortPartialReuseTest extends CompilerTestBase {
+
+       @Test
+       public void testPartialPartitioningReuse() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> input = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+                       
+                       input
+                               .partitionByHash(0)
+                               .map(new 
IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+                               
+                               .groupBy(0, 1)
+                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", 
"2")
+                               
+                               .groupBy(0)
+                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+                               
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode reducer2 = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode reducer1 = (SingleInputPlanNode) 
reducer2.getInput().getSource();
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
+
+                       // should be locally forwarding, reusing sort and 
partitioning
+                       assertEquals(ShipStrategyType.FORWARD, 
reducer2.getInput().getShipStrategy());
+                       assertEquals(LocalStrategy.NONE, 
reducer2.getInput().getLocalStrategy());
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
reducer1.getInput().getShipStrategy());
+                       assertEquals(LocalStrategy.COMBININGSORT, 
reducer1.getInput().getLocalStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testCustomPartitioningNotReused() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> input = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+                       
+                       input
+                               .partitionCustom(new Partitioner<Long>() {
+                                       @Override
+                                       public int partition(Long key, int 
numPartitions) { return 0; }
+                               }, 0)
+                               .map(new 
IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
+                               
+                               .groupBy(0, 1)
+                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", 
"2")
+                               
+                               .groupBy(1)
+                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+                               
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode reducer2 = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer2.getInput().getSource();
+                       SingleInputPlanNode reducer1 = (SingleInputPlanNode) 
combiner.getInput().getSource();
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
+
+                       // should be locally forwarding, reusing sort and 
partitioning
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
reducer2.getInput().getShipStrategy());
+                       assertEquals(LocalStrategy.COMBININGSORT, 
reducer2.getInput().getLocalStrategy());
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
+                       assertEquals(LocalStrategy.NONE, 
combiner.getInput().getLocalStrategy());
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
reducer1.getInput().getShipStrategy());
+                       assertEquals(LocalStrategy.COMBININGSORT, 
reducer1.getInput().getLocalStrategy());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
new file mode 100644
index 0000000..92b4fc5
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {
+
+       @Test
+       public void testUnionStaticFirst() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> input1 = env.generateSequence(1, 10);
+                       DataSet<Long> input2 = env.generateSequence(1, 10);
+                       
+                       IterativeDataSet<Long> iteration = input1.iterate(10);
+                       
+                       DataSet<Long> result = iteration.closeWith(
+                                       
input2.union(input2).union(iteration.union(iteration)));
+                               
+                       result.print();
+                       result.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       assertEquals(2, op.getDataSinks().size());
+                       
+                       BulkIterationPlanNode iterPlan = 
(BulkIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
+                       
+                       SingleInputPlanNode noopNode = (SingleInputPlanNode) 
iterPlan.getRootOfStepFunction();
+                       BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) 
noopNode.getInput().getSource();
+                       NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) 
mixedUnion.getInput1().getSource();
+                       NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) 
mixedUnion.getInput2().getSource();
+                       
+                       assertTrue(mixedUnion.unionsStaticAndDynamicPath());
+                       assertFalse(mixedUnion.getInput1().isOnDynamicPath());
+                       assertTrue(mixedUnion.getInput2().isOnDynamicPath());
+                       
assertTrue(mixedUnion.getInput1().getTempMode().isCached());
+                       
+                       for (Channel c : staticUnion.getInputs()) {
+                               assertFalse(c.isOnDynamicPath());
+                       }
+                       for (Channel c : dynamicUnion.getInputs()) {
+                               assertTrue(c.isOnDynamicPath());
+                       }
+                       
+                       assertEquals(0.5, 
iterPlan.getRelativeMemoryPerSubTask(), 0.0);
+                       assertEquals(0.5, 
mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
+                       assertEquals(0.0, 
mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
+                       
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testUnionStaticSecond() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> input1 = env.generateSequence(1, 10);
+                       DataSet<Long> input2 = env.generateSequence(1, 10);
+                       
+                       IterativeDataSet<Long> iteration = input1.iterate(10);
+                       
+                       DataSet<Long> iterResult = iteration
+                               
.closeWith(iteration.union(iteration).union(input2.union(input2)));
+                       
+                       iterResult.print();
+                       iterResult.print();
+                       
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       assertEquals(2, op.getDataSinks().size());
+                       
+                       BulkIterationPlanNode iterPlan = 
(BulkIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
+                       
+                       SingleInputPlanNode noopNode = (SingleInputPlanNode) 
iterPlan.getRootOfStepFunction();
+                       BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) 
noopNode.getInput().getSource();
+                       NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) 
mixedUnion.getInput1().getSource();
+                       NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) 
mixedUnion.getInput2().getSource();
+                       
+                       assertTrue(mixedUnion.unionsStaticAndDynamicPath());
+                       assertFalse(mixedUnion.getInput1().isOnDynamicPath());
+                       assertTrue(mixedUnion.getInput2().isOnDynamicPath());
+                       
assertTrue(mixedUnion.getInput1().getTempMode().isCached());
+                       
+                       assertEquals(0.5, 
iterPlan.getRelativeMemoryPerSubTask(), 0.0);
+                       assertEquals(0.5, 
mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
+                       assertEquals(0.0, 
mixedUnion.getInput2().getRelativeTempMemory(), 0.0);
+                       
+                       for (Channel c : staticUnion.getInputs()) {
+                               assertFalse(c.isOnDynamicPath());
+                       }
+                       for (Channel c : dynamicUnion.getInputs()) {
+                               assertTrue(c.isOnDynamicPath());
+                       }
+                       
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
new file mode 100644
index 0000000..5d15ed8
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityReduce;
+
+
+@SuppressWarnings({"serial", "deprecation"})
+public class UnionPropertyPropagationTest extends CompilerTestBase {
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testUnionPropertyOldApiPropagation() {
+               // construct the plan
+
+               FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
+               FileDataSource sourceB = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
+               
+               ReduceOperator redA = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
+                       .input(sourceA)
+                       .build();
+               ReduceOperator redB = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
+                       .input(sourceB)
+                       .build();
+               
+               ReduceOperator globalRed = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).build();
+               globalRed.addInput(redA);
+               globalRed.addInput(redB);
+               
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, globalRed);
+               
+               // return the plan
+               Plan plan = new Plan(sink, "Union Property Propagation");
+               
+               OptimizedPlan oPlan = compileNoStats(plan);
+               
+               JobGraphGenerator jobGen = new JobGraphGenerator();
+               
+               // Compile plan to verify that no error is thrown
+               jobGen.compileJobGraph(oPlan);
+               
+               oPlan.accept(new Visitor<PlanNode>() {
+                       
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof ReduceOperator) {
+                                       for (Channel inConn : 
visitable.getInputs()) {
+                                               Assert.assertTrue("Reduce 
should just forward the input if it is already partitioned",
+                                                               
inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
+                                       }
+                                       //just check latest ReduceNode
+                                       return false;
+                               }
+                               return true;
+                       }
+                       
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+                               // DO NOTHING
+                       }
+               });
+       }
+       
+       @Test
+       public void testUnionNewApiAssembly() {
+               final int NUM_INPUTS = 4;
+               
+               // construct the plan it will be multiple flat maps, all unioned
+               // and the "unioned" dataSet will be grouped
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               DataSet<String> source = env.readTextFile(IN_FILE);
+               DataSet<Tuple2<String, Integer>> lastUnion = source.flatMap(new 
DummyFlatMap());
+       
+               for (int i = 1; i< NUM_INPUTS; i++){
+                       lastUnion = lastUnion.union(source.flatMap(new 
DummyFlatMap()));
+               }
+               
+               DataSet<Tuple2<String, Integer>> result = 
lastUnion.groupBy(0).aggregate(Aggregations.SUM, 1);
+               result.writeAsText(OUT_FILE);
+       
+               // return the plan
+               Plan plan = env.createProgramPlan("Test union on new java-api");
+               OptimizedPlan oPlan = compileNoStats(plan);
+               JobGraphGenerator jobGen = new JobGraphGenerator();
+               
+               // Compile plan to verify that no error is thrown
+               jobGen.compileJobGraph(oPlan);
+               
+               oPlan.accept(new Visitor<PlanNode>() {
+                       
+                       @Override
+                       public boolean preVisit(PlanNode visitable) {
+                               
+                               /* Test on the union output connections
+                                * It must be under the GroupOperator and the 
strategy should be forward
+                                */
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof GroupReduceOperatorBase){
+                                       final Channel inConn = 
((SingleInputPlanNode) visitable).getInput();
+                                       Assert.assertTrue("Union should just 
forward the Partitioning",
+                                                       
inConn.getShipStrategy() == ShipStrategyType.FORWARD ); 
+                                       Assert.assertTrue("Union Node should be 
under Group operator",
+                                                       inConn.getSource() 
instanceof NAryUnionPlanNode );
+                               }
+                               
+                               /* Test on the union input connections
+                                * Must be NUM_INPUTS input connections, all 
FlatMapOperators with a own partitioning strategy(propably hash)
+                                */
+                               if (visitable instanceof NAryUnionPlanNode) {
+                                       int numberInputs = 0;
+                                       for (Iterator<Channel> inputs = 
visitable.getInputs().iterator(); inputs.hasNext(); numberInputs++) {
+                                               final Channel inConn = 
inputs.next();
+                                               PlanNode inNode = 
inConn.getSource();
+                                               Assert.assertTrue("Input of 
Union should be FlatMapOperators",
+                                                               
inNode.getProgramOperator() instanceof FlatMapOperatorBase);
+                                               Assert.assertTrue("Shipment 
strategy under union should partition the data",
+                                                               
inConn.getShipStrategy() == ShipStrategyType.PARTITION_HASH); 
+                                       }
+                                       
+                                       Assert.assertTrue("NAryUnion should 
have " + NUM_INPUTS + " inputs", numberInputs == NUM_INPUTS);
+                                       return false;
+                               }
+                               return true;
+                       }
+                       
+                       @Override
+                       public void postVisit(PlanNode visitable) {
+                               // DO NOTHING
+                       }
+               });
+       }
+
+       public static final class DummyFlatMap extends 
RichFlatMapFunction<String, Tuple2<String, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       out.collect(new Tuple2<String, Integer>(value, 0));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
new file mode 100644
index 0000000..1e4124c
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class UnionReplacementTest extends CompilerTestBase {
+
+       @Test
+       public void testUnionReplacement() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       DataSet<String> input1 = env.fromElements("test1");
+                       DataSet<String> input2 = env.fromElements("test2");
+       
+                       DataSet<String> union = input1.union(input2);
+       
+                       union.print();
+                       union.print();
+       
+                       Plan plan = env.createProgramPlan();
+                       OptimizedPlan oPlan = compileNoStats(plan);
+                       JobGraphGenerator jobGen = new JobGraphGenerator();
+                       jobGen.compileJobGraph(oPlan);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
new file mode 100644
index 0000000..80c0bda
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class WorksetIterationCornerCasesTest extends CompilerTestBase {
+
+       @Test
+       public void testWorksetIterationNotDependingOnSolutionSet() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Tuple2<Long, Long>> input = 
env.generateSequence(1, 100).map(new Duplicator<Long>());
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = input.iterateDelta(input, 100, 1);
+                       
+                       DataSet<Tuple2<Long, Long>> iterEnd = 
iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
+                       iteration.closeWith(iterEnd, iterEnd).print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       WorksetIterationPlanNode wipn = 
(WorksetIterationPlanNode) 
op.getDataSinks().iterator().next().getInput().getSource();
+                       
assertTrue(wipn.getSolutionSetPlanNode().getOutgoingChannels().isEmpty());
+                       
+                       JobGraphGenerator jgg = new JobGraphGenerator();
+                       jgg.compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       private static final class Duplicator<T> implements MapFunction<T, 
Tuple2<T, T>> {
+               @Override
+               public Tuple2<T, T> map(T value) {
+                       return new Tuple2<T, T>(value, value);
+               }
+       }
+       
+       private static final class TestMapper<T> implements MapFunction<T, T> {
+               @Override
+               public T map(T value) {
+                       return value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
new file mode 100644
index 0000000..6e7c0a3
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.record.operators.JoinOperator;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.DummyInputFormat;
+import org.apache.flink.optimizer.util.DummyMatchStub;
+import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
+import org.apache.flink.optimizer.util.DummyOutputFormat;
+import org.apache.flink.optimizer.util.IdentityMap;
+import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+
+/**
+* Tests that validate optimizer choices when using operators that are 
requesting certain specific execution
+* strategies.
+*/
+@SuppressWarnings("deprecation")
+public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
+       
+       private static final long serialVersionUID = 1L;
+       
+       private static final String ITERATION_NAME = "Test Workset Iteration";
+       private static final String JOIN_WITH_INVARIANT_NAME = "Test Join 
Invariant";
+       private static final String JOIN_WITH_SOLUTION_SET = "Test Join 
SolutionSet";
+       private static final String NEXT_WORKSET_REDUCER_NAME = "Test Reduce 
Workset";
+       private static final String SOLUTION_DELTA_MAPPER_NAME = "Test Map 
Delta";
+       
+       private final FieldList list0 = new FieldList(0);
+
+       @Test
+       public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
+               Plan plan = getRecordTestPlan(false, true);
+               
+               OptimizedPlan oPlan;
+               try {
+                       oPlan = compileNoStats(plan);
+               } catch(CompilerException ce) {
+                       ce.printStackTrace();
+                       fail("The pact compiler is unable to compile this plan 
correctly.");
+                       return; // silence the compiler
+               }
+               
+               OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
+               DualInputPlanNode joinWithInvariantNode = 
resolver.getNode(JOIN_WITH_INVARIANT_NAME);
+               DualInputPlanNode joinWithSolutionSetNode = 
resolver.getNode(JOIN_WITH_SOLUTION_SET);
+               SingleInputPlanNode worksetReducer = 
resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
+               SingleInputPlanNode deltaMapper = 
resolver.getNode(SOLUTION_DELTA_MAPPER_NAME);
+               
+               // iteration preserves partitioning in reducer, so the first 
partitioning is out of the loop, 
+               // the in-loop partitioning is before the final reducer
+               
+               // verify joinWithInvariant
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithInvariantNode.getInput1().getShipStrategy()); 
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithInvariantNode.getInput2().getShipStrategy());
+               assertEquals(list0, joinWithInvariantNode.getKeysForInput1());
+               assertEquals(list0, joinWithInvariantNode.getKeysForInput2());
+               
+               // verify joinWithSolutionSet
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput1().getShipStrategy());
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput2().getShipStrategy());
+               
+               // verify reducer
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
worksetReducer.getInput().getShipStrategy());
+               assertEquals(list0, worksetReducer.getKeys(0));
+               
+               // currently, the system may partition before or after the 
mapper
+               ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
+               ShipStrategyType ss2 = 
deltaMapper.getOutgoingChannels().get(0).getShipStrategy();
+               
+               assertTrue( (ss1 == ShipStrategyType.FORWARD && ss2 == 
ShipStrategyType.PARTITION_HASH) ||
+                                       (ss2 == ShipStrategyType.FORWARD && ss1 
== ShipStrategyType.PARTITION_HASH) );
+               
+               new JobGraphGenerator().compileJobGraph(oPlan);
+       }
+       
+       @Test
+       public void 
testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
+               Plan plan = getRecordTestPlan(false, false);
+               
+               OptimizedPlan oPlan;
+               try {
+                       oPlan = compileNoStats(plan);
+               } catch(CompilerException ce) {
+                       ce.printStackTrace();
+                       fail("The pact compiler is unable to compile this plan 
correctly.");
+                       return; // silence the compiler
+               }
+               
+               OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
+               DualInputPlanNode joinWithInvariantNode = 
resolver.getNode(JOIN_WITH_INVARIANT_NAME);
+               DualInputPlanNode joinWithSolutionSetNode = 
resolver.getNode(JOIN_WITH_SOLUTION_SET);
+               SingleInputPlanNode worksetReducer = 
resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
+               
+               // iteration preserves partitioning in reducer, so the first 
partitioning is out of the loop, 
+               // the in-loop partitioning is before the final reducer
+               
+               // verify joinWithInvariant
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithInvariantNode.getInput1().getShipStrategy()); 
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithInvariantNode.getInput2().getShipStrategy());
+               assertEquals(list0, joinWithInvariantNode.getKeysForInput1());
+               assertEquals(list0, joinWithInvariantNode.getKeysForInput2());
+               
+               // verify joinWithSolutionSet
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput1().getShipStrategy());
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput2().getShipStrategy());
+               
+               // verify reducer
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
worksetReducer.getInput().getShipStrategy());
+               assertEquals(list0, worksetReducer.getKeys(0));
+               
+               
+               // verify solution delta
+               assertEquals(2, 
joinWithSolutionSetNode.getOutgoingChannels().size());
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithSolutionSetNode.getOutgoingChannels().get(1).getShipStrategy());
+               
+               new JobGraphGenerator().compileJobGraph(oPlan);
+       }
+       
+       @Test
+       public void testRecordApiWithDirectSoltionSetUpdate() {
+               Plan plan = getRecordTestPlan(true, false);
+               
+               OptimizedPlan oPlan;
+               try {
+                       oPlan = compileNoStats(plan);
+               } catch(CompilerException ce) {
+                       ce.printStackTrace();
+                       fail("The pact compiler is unable to compile this plan 
correctly.");
+                       return; // silence the compiler
+               }
+               
+               OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
+               DualInputPlanNode joinWithInvariantNode = 
resolver.getNode(JOIN_WITH_INVARIANT_NAME);
+               DualInputPlanNode joinWithSolutionSetNode = 
resolver.getNode(JOIN_WITH_SOLUTION_SET);
+               SingleInputPlanNode worksetReducer = 
resolver.getNode(NEXT_WORKSET_REDUCER_NAME);
+               
+               // iteration preserves partitioning in reducer, so the first 
partitioning is out of the loop, 
+               // the in-loop partitioning is before the final reducer
+               
+               // verify joinWithInvariant
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithInvariantNode.getInput1().getShipStrategy()); 
+               assertEquals(ShipStrategyType.PARTITION_HASH, 
joinWithInvariantNode.getInput2().getShipStrategy());
+               assertEquals(list0, joinWithInvariantNode.getKeysForInput1());
+               assertEquals(list0, joinWithInvariantNode.getKeysForInput2());
+               
+               // verify joinWithSolutionSet
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput1().getShipStrategy());
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getInput2().getShipStrategy());
+               
+               // verify reducer
+               assertEquals(ShipStrategyType.FORWARD, 
worksetReducer.getInput().getShipStrategy());
+               assertEquals(list0, worksetReducer.getKeys(0));
+               
+               
+               // verify solution delta
+               assertEquals(1, 
joinWithSolutionSetNode.getOutgoingChannels().size());
+               assertEquals(ShipStrategyType.FORWARD, 
joinWithSolutionSetNode.getOutgoingChannels().get(0).getShipStrategy());
+               
+               new JobGraphGenerator().compileJobGraph(oPlan);
+       }
+       
+       private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, 
boolean mapBeforeSolutionDelta) {
+               FileDataSource solutionSetInput = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Solution Set");
+               FileDataSource worksetInput = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Workset");
+               
+               FileDataSource invariantInput = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Invariant Input");
+               
+               DeltaIteration iteration = new DeltaIteration(0, 
ITERATION_NAME);
+               iteration.setInitialSolutionSet(solutionSetInput);
+               iteration.setInitialWorkset(worksetInput);
+               iteration.setMaximumNumberOfIterations(100);
+
+               JoinOperator joinWithInvariant = JoinOperator.builder(new 
DummyMatchStub(), LongValue.class, 0, 0)
+                               .input1(iteration.getWorkset())
+                               .input2(invariantInput)
+                               .name(JOIN_WITH_INVARIANT_NAME)
+                               .build();
+
+               JoinOperator joinWithSolutionSet = JoinOperator.builder(
+                               joinPreservesSolutionSet ? new DummyMatchStub() 
: new DummyNonPreservingMatchStub(), LongValue.class, 0, 0)
+                               .input1(iteration.getSolutionSet())
+                               .input2(joinWithInvariant)
+                               .name(JOIN_WITH_SOLUTION_SET)
+                               .build();
+               
+               ReduceOperator nextWorkset = ReduceOperator.builder(new 
IdentityReduce(), LongValue.class, 0)
+                               .input(joinWithSolutionSet)
+                               .name(NEXT_WORKSET_REDUCER_NAME)
+                               .build();
+               
+               if (mapBeforeSolutionDelta) {
+                       MapOperator mapper = MapOperator.builder(new 
IdentityMap())
+                               .input(joinWithSolutionSet)
+                               .name(SOLUTION_DELTA_MAPPER_NAME)
+                               .build();
+                       iteration.setSolutionSetDelta(mapper);
+               } else {
+                       iteration.setSolutionSetDelta(joinWithSolutionSet);
+               }
+               
+               iteration.setNextWorkset(nextWorkset);
+
+               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, iteration, "Sink");
+               
+               Plan plan = new Plan(sink);
+               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
+               return plan;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/costs/DefaultCostEstimatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/costs/DefaultCostEstimatorTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/costs/DefaultCostEstimatorTest.java
new file mode 100644
index 0000000..af03eec
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/costs/DefaultCostEstimatorTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.costs;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.optimizer.dag.EstimateProvider;
+import org.junit.Test;
+
+/**
+ * Tests for the cost formulas in the {@link DefaultCostEstimator}. Most of 
the tests establish relative
+ * relationships.
+ */
+public class DefaultCostEstimatorTest {
+       
+       // estimates
+       
+       private static final long SMALL_DATA_SIZE = 10000;
+       private static final long SMALL_RECORD_COUNT = 100;
+       
+       private static final long MEDIUM_DATA_SIZE = 500000000L;
+       private static final long MEDIUM_RECORD_COUNT = 500000L;
+       
+       private static final long BIG_DATA_SIZE = 100000000000L;
+       private static final long BIG_RECORD_COUNT = 100000000L;
+       
+       private static final EstimateProvider UNKNOWN_ESTIMATES = new 
UnknownEstimates();
+       private static final EstimateProvider ZERO_ESTIMATES = new Estimates(0, 
0);
+       private static final EstimateProvider SMALL_ESTIMATES = new 
Estimates(SMALL_DATA_SIZE, SMALL_RECORD_COUNT);
+       private static final EstimateProvider MEDIUM_ESTIMATES = new 
Estimates(MEDIUM_DATA_SIZE, MEDIUM_RECORD_COUNT);
+       private static final EstimateProvider BIG_ESTIMATES = new 
Estimates(BIG_DATA_SIZE, BIG_RECORD_COUNT);
+       
+       private final CostEstimator costEstimator = new DefaultCostEstimator();
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testShipStrategiesIsolated() {
+               testShipStrategiesIsolated(UNKNOWN_ESTIMATES, 1);
+               testShipStrategiesIsolated(UNKNOWN_ESTIMATES, 10);
+               testShipStrategiesIsolated(ZERO_ESTIMATES, 1);
+               testShipStrategiesIsolated(ZERO_ESTIMATES, 10);
+               testShipStrategiesIsolated(SMALL_ESTIMATES, 1);
+               testShipStrategiesIsolated(SMALL_ESTIMATES, 10);
+               testShipStrategiesIsolated(BIG_ESTIMATES, 1);
+               testShipStrategiesIsolated(BIG_ESTIMATES, 10);
+       }
+       
+       private void testShipStrategiesIsolated(EstimateProvider estimates, int 
targetParallelism) {
+               Costs random = new Costs();
+               costEstimator.addRandomPartitioningCost(estimates, random);
+               
+               Costs hash = new Costs();
+               costEstimator.addHashPartitioningCost(estimates, hash);
+               
+               Costs range = new Costs();
+               costEstimator.addRangePartitionCost(estimates, range);
+               
+               Costs broadcast = new Costs();
+               costEstimator.addBroadcastCost(estimates, targetParallelism, 
broadcast);
+               
+               int randomVsHash = random.compareTo(hash);
+               int hashVsRange = hash.compareTo(range);
+               int hashVsBroadcast = hash.compareTo(broadcast);
+               int rangeVsBroadcast = range.compareTo(broadcast);
+
+               // repartition random is at most as expensive as hash 
partitioning
+               assertTrue(randomVsHash <= 0);
+               
+               // range partitioning is always more expensive than hash 
partitioning
+               assertTrue(hashVsRange < 0);
+               
+               // broadcasting is always more expensive than hash partitioning
+               if (targetParallelism > 1) {
+                       assertTrue(hashVsBroadcast < 0);
+               } else {
+                       assertTrue(hashVsBroadcast <= 0);
+               }
+               
+               // range partitioning is not more expensive than broadcasting
+               if (targetParallelism > 1) {
+                       assertTrue(rangeVsBroadcast < 0);
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testShipStrategyCombinationsPlain() {
+               Costs hashBothSmall = new Costs();
+               Costs hashSmallAndLarge = new Costs();
+               Costs hashBothLarge = new Costs();
+               
+               Costs hashSmallBcLarge10 = new Costs();
+               Costs hashLargeBcSmall10 = new Costs();
+               
+               Costs hashSmallBcLarge1000 = new Costs();
+               Costs hashLargeBcSmall1000 = new Costs();
+               
+               Costs forwardSmallBcLarge10 = new Costs();
+               Costs forwardLargeBcSmall10 = new Costs();
+               
+               Costs forwardSmallBcLarge1000 = new Costs();
+               Costs forwardLargeBcSmall1000 = new Costs();
+               
+               costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, 
hashBothSmall);
+               costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, 
hashBothSmall);
+               
+               costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, 
hashSmallAndLarge);
+               costEstimator.addHashPartitioningCost(BIG_ESTIMATES, 
hashSmallAndLarge);
+               
+               costEstimator.addHashPartitioningCost(BIG_ESTIMATES, 
hashBothLarge);
+               costEstimator.addHashPartitioningCost(BIG_ESTIMATES, 
hashBothLarge);
+               
+               costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, 
hashSmallBcLarge10);
+               costEstimator.addBroadcastCost(BIG_ESTIMATES, 10, 
hashSmallBcLarge10);
+               
+               costEstimator.addHashPartitioningCost(BIG_ESTIMATES, 
hashLargeBcSmall10);
+               costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 10, 
hashLargeBcSmall10);
+               
+               costEstimator.addHashPartitioningCost(MEDIUM_ESTIMATES, 
hashSmallBcLarge1000);
+               costEstimator.addBroadcastCost(BIG_ESTIMATES, 1000, 
hashSmallBcLarge1000);
+               
+               costEstimator.addHashPartitioningCost(BIG_ESTIMATES, 
hashLargeBcSmall1000);
+               costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 1000, 
hashLargeBcSmall1000);
+               
+               costEstimator.addBroadcastCost(BIG_ESTIMATES, 10, 
forwardSmallBcLarge10);
+               
+               costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 10, 
forwardLargeBcSmall10);
+               
+               costEstimator.addBroadcastCost(BIG_ESTIMATES, 1000, 
forwardSmallBcLarge1000);
+               
+               costEstimator.addBroadcastCost(MEDIUM_ESTIMATES, 1000, 
forwardLargeBcSmall1000);
+               
+               // hash cost is roughly monotonous
+               assertTrue(hashBothSmall.compareTo(hashSmallAndLarge) < 0);
+               assertTrue(hashSmallAndLarge.compareTo(hashBothLarge) < 0);
+               
+               // broadcast the smaller is better
+               assertTrue(hashLargeBcSmall10.compareTo(hashSmallBcLarge10) < 
0);
+               
assertTrue(forwardLargeBcSmall10.compareTo(forwardSmallBcLarge10) < 0);
+               assertTrue(hashLargeBcSmall1000.compareTo(hashSmallBcLarge1000) 
< 0);
+               
assertTrue(forwardLargeBcSmall1000.compareTo(forwardSmallBcLarge1000) < 0);
+               
+               // broadcasting small and forwarding large is better than 
partition both, given size difference
+               assertTrue(forwardLargeBcSmall10.compareTo(hashSmallAndLarge) < 
0);
+               
+               // broadcasting too far is expensive again
+               assertTrue(forwardLargeBcSmall1000.compareTo(hashSmallAndLarge) 
> 0);
+               
+               // assert weight is respected
+               assertTrue(hashSmallBcLarge10.compareTo(hashSmallBcLarge1000) < 
0);
+               assertTrue(hashLargeBcSmall10.compareTo(hashLargeBcSmall1000) < 
0);
+               
assertTrue(forwardSmallBcLarge10.compareTo(forwardSmallBcLarge1000) < 0);
+               
assertTrue(forwardLargeBcSmall10.compareTo(forwardLargeBcSmall1000) < 0);
+               
+               // forward versus hash
+               assertTrue(forwardSmallBcLarge10.compareTo(hashSmallBcLarge10) 
< 0);
+               
assertTrue(forwardSmallBcLarge1000.compareTo(hashSmallBcLarge1000) < 0);
+               assertTrue(forwardLargeBcSmall10.compareTo(hashLargeBcSmall10) 
< 0);
+               
assertTrue(forwardLargeBcSmall1000.compareTo(hashLargeBcSmall1000) < 0);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testShipStrategyCombinationsWithUnknowns() {
+               testShipStrategyCombinationsWithUnknowns(UNKNOWN_ESTIMATES);
+               testShipStrategyCombinationsWithUnknowns(ZERO_ESTIMATES);
+               testShipStrategyCombinationsWithUnknowns(SMALL_ESTIMATES);
+               testShipStrategyCombinationsWithUnknowns(MEDIUM_ESTIMATES);
+               testShipStrategyCombinationsWithUnknowns(BIG_ESTIMATES);
+       }
+       
+       private void testShipStrategyCombinationsWithUnknowns(EstimateProvider 
knownEstimates) {
+               Costs hashBoth = new Costs();
+               Costs bcKnown10 = new Costs();
+               Costs bcUnknown10 = new Costs();
+               Costs bcKnown1000 = new Costs();
+               Costs bcUnknown1000 = new Costs();
+               
+               costEstimator.addHashPartitioningCost(knownEstimates, hashBoth);
+               costEstimator.addHashPartitioningCost(UNKNOWN_ESTIMATES, 
hashBoth);
+               
+               costEstimator.addBroadcastCost(knownEstimates, 10, bcKnown10);
+               
+               costEstimator.addBroadcastCost(UNKNOWN_ESTIMATES, 10, 
bcUnknown10);
+               
+               costEstimator.addBroadcastCost(knownEstimates, 1000, 
bcKnown1000);
+               
+               costEstimator.addBroadcastCost(UNKNOWN_ESTIMATES, 1000, 
bcUnknown1000);
+               
+               // if we do not know one of them, hashing both should be 
cheaper than anything
+               assertTrue(hashBoth.compareTo(bcKnown10) < 0);
+               assertTrue(hashBoth.compareTo(bcUnknown10) < 0);
+               assertTrue(hashBoth.compareTo(bcKnown1000) < 0);
+               assertTrue(hashBoth.compareTo(bcUnknown1000) < 0);
+               
+               // there should be no bias in broadcasting a known or unknown 
size input
+               assertTrue(bcKnown10.compareTo(bcUnknown10) == 0);
+               assertTrue(bcKnown1000.compareTo(bcUnknown1000) == 0);
+               
+               // replication factor does matter
+               assertTrue(bcKnown10.compareTo(bcKnown1000) < 0);
+               assertTrue(bcUnknown10.compareTo(bcUnknown1000) < 0);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testJoinCostFormulasPlain() {
+               
+               // hash join costs
+               
+               Costs hashBothSmall = new Costs();
+               Costs hashBothLarge = new Costs();
+               Costs hashSmallBuild = new Costs();
+               Costs hashLargeBuild = new Costs();
+               
+               costEstimator.addHybridHashCosts(SMALL_ESTIMATES, 
BIG_ESTIMATES, hashSmallBuild, 1);
+               costEstimator.addHybridHashCosts(BIG_ESTIMATES, 
SMALL_ESTIMATES, hashLargeBuild, 1);
+               costEstimator.addHybridHashCosts(SMALL_ESTIMATES, 
SMALL_ESTIMATES, hashBothSmall, 1);
+               costEstimator.addHybridHashCosts(BIG_ESTIMATES, BIG_ESTIMATES, 
hashBothLarge, 1);
+
+               assertTrue(hashBothSmall.compareTo(hashSmallBuild) < 0);
+               assertTrue(hashSmallBuild.compareTo(hashLargeBuild) < 0);
+               assertTrue(hashLargeBuild.compareTo(hashBothLarge) < 0);
+               
+               // merge join costs
+               
+               Costs mergeBothSmall = new Costs();
+               Costs mergeBothLarge = new Costs();
+               Costs mergeSmallFirst = new Costs();
+               Costs mergeSmallSecond = new Costs();
+               
+               costEstimator.addLocalSortCost(SMALL_ESTIMATES, 
mergeSmallFirst);
+               costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeSmallFirst);
+               costEstimator.addLocalMergeCost(SMALL_ESTIMATES, BIG_ESTIMATES, 
mergeSmallFirst, 1);
+               
+               costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeSmallSecond);
+               costEstimator.addLocalSortCost(SMALL_ESTIMATES, 
mergeSmallSecond);
+               costEstimator.addLocalMergeCost(BIG_ESTIMATES, SMALL_ESTIMATES, 
mergeSmallSecond, 1);
+               
+               costEstimator.addLocalSortCost(SMALL_ESTIMATES, mergeBothSmall);
+               costEstimator.addLocalSortCost(SMALL_ESTIMATES, mergeBothSmall);
+               costEstimator.addLocalMergeCost(SMALL_ESTIMATES, 
SMALL_ESTIMATES, mergeBothSmall, 1);
+               
+               costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeBothLarge);
+               costEstimator.addLocalSortCost(BIG_ESTIMATES, mergeBothLarge);
+               costEstimator.addLocalMergeCost(BIG_ESTIMATES, BIG_ESTIMATES, 
mergeBothLarge, 1);
+               
+               
+               assertTrue(mergeBothSmall.compareTo(mergeSmallFirst) < 0);
+               assertTrue(mergeBothSmall.compareTo(mergeSmallSecond) < 0);
+               assertTrue(mergeSmallFirst.compareTo(mergeSmallSecond) == 0);
+               assertTrue(mergeSmallFirst.compareTo(mergeBothLarge) < 0);
+               assertTrue(mergeSmallSecond.compareTo(mergeBothLarge) < 0);
+               
+               // compare merge join and hash join costs
+               
+               assertTrue(hashBothSmall.compareTo(mergeBothSmall) < 0);
+               assertTrue(hashBothLarge.compareTo(mergeBothLarge) < 0);
+               assertTrue(hashSmallBuild.compareTo(mergeSmallFirst) < 0);
+               assertTrue(hashSmallBuild.compareTo(mergeSmallSecond) < 0);
+               assertTrue(hashLargeBuild.compareTo(mergeSmallFirst) < 0);
+               assertTrue(hashLargeBuild.compareTo(mergeSmallSecond) < 0);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testJoinCostFormulasWithWeights() {
+               testJoinCostFormulasWithWeights(UNKNOWN_ESTIMATES, 
SMALL_ESTIMATES);
+               testJoinCostFormulasWithWeights(SMALL_ESTIMATES, 
UNKNOWN_ESTIMATES);
+               testJoinCostFormulasWithWeights(UNKNOWN_ESTIMATES, 
MEDIUM_ESTIMATES);
+               testJoinCostFormulasWithWeights(MEDIUM_ESTIMATES, 
UNKNOWN_ESTIMATES);
+               testJoinCostFormulasWithWeights(BIG_ESTIMATES, 
MEDIUM_ESTIMATES);
+               testJoinCostFormulasWithWeights(MEDIUM_ESTIMATES, 
BIG_ESTIMATES);
+       }
+       
+       private void testJoinCostFormulasWithWeights(EstimateProvider e1, 
EstimateProvider e2) {
+               Costs hf1 = new Costs();
+               Costs hf5 = new Costs();
+               Costs hs1 = new Costs();
+               Costs hs5 = new Costs();
+               Costs mm1 = new Costs();
+               Costs mm5 = new Costs();
+               
+               costEstimator.addHybridHashCosts(e1, e2, hf1, 1);
+               costEstimator.addHybridHashCosts(e1, e2, hf5, 5);
+               costEstimator.addHybridHashCosts(e2, e1, hs1, 1);
+               costEstimator.addHybridHashCosts(e2, e1, hs5, 5);
+               
+               costEstimator.addLocalSortCost(e1, mm1);
+               costEstimator.addLocalSortCost(e2, mm1);
+               costEstimator.addLocalMergeCost(e1, e2, mm1, 1);
+               
+               costEstimator.addLocalSortCost(e1, mm5);
+               costEstimator.addLocalSortCost(e2, mm5);
+               mm5.multiplyWith(5);
+               costEstimator.addLocalMergeCost(e1, e2, mm5, 5);
+               
+               // weight 1 versus weight 5
+               assertTrue(hf1.compareTo(hf5) < 0);
+               assertTrue(hs1.compareTo(hs5) < 0);
+               assertTrue(mm1.compareTo(mm5) < 0);
+               
+               // hash versus merge
+               assertTrue(hf1.compareTo(mm1) < 0);
+               assertTrue(hs1.compareTo(mm1) < 0);
+               assertTrue(hf5.compareTo(mm5) < 0);
+               assertTrue(hs5.compareTo(mm5) < 0);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Test
+       public void testHashJoinCostFormulasWithCaches() {
+               
+               Costs hashBothUnknown10 = new Costs();
+               Costs hashBothUnknownCached10 = new Costs();
+               
+               Costs hashBothSmall10 = new Costs();
+               Costs hashBothSmallCached10 = new Costs();
+               
+               Costs hashSmallLarge10 = new Costs();
+               Costs hashSmallLargeCached10 = new Costs();
+               
+               Costs hashLargeSmall10 = new Costs();
+               Costs hashLargeSmallCached10 = new Costs();
+               
+               Costs hashLargeSmall1 = new Costs();
+               Costs hashLargeSmallCached1 = new Costs();
+               
+               costEstimator.addHybridHashCosts(UNKNOWN_ESTIMATES, 
UNKNOWN_ESTIMATES, hashBothUnknown10, 10);
+               costEstimator.addCachedHybridHashCosts(UNKNOWN_ESTIMATES, 
UNKNOWN_ESTIMATES, hashBothUnknownCached10, 10);
+               
+               costEstimator.addHybridHashCosts(MEDIUM_ESTIMATES, 
MEDIUM_ESTIMATES, hashBothSmall10, 10);
+               costEstimator.addCachedHybridHashCosts(MEDIUM_ESTIMATES, 
MEDIUM_ESTIMATES, hashBothSmallCached10, 10);
+               
+               costEstimator.addHybridHashCosts(MEDIUM_ESTIMATES, 
BIG_ESTIMATES, hashSmallLarge10, 10);
+               costEstimator.addCachedHybridHashCosts(MEDIUM_ESTIMATES, 
BIG_ESTIMATES, hashSmallLargeCached10, 10);
+               
+               costEstimator.addHybridHashCosts(BIG_ESTIMATES, 
MEDIUM_ESTIMATES, hashLargeSmall10, 10);
+               costEstimator.addCachedHybridHashCosts(BIG_ESTIMATES, 
MEDIUM_ESTIMATES, hashLargeSmallCached10, 10);
+               
+               costEstimator.addHybridHashCosts(BIG_ESTIMATES, 
MEDIUM_ESTIMATES, hashLargeSmall1, 1);
+               costEstimator.addCachedHybridHashCosts(BIG_ESTIMATES, 
MEDIUM_ESTIMATES, hashLargeSmallCached1, 1);
+               
+               // cached variant is always cheaper
+               assertTrue(hashBothUnknown10.compareTo(hashBothUnknownCached10) 
> 0);
+               assertTrue(hashBothSmall10.compareTo(hashBothSmallCached10) > 
0);
+               assertTrue(hashSmallLarge10.compareTo(hashSmallLargeCached10) > 
0);
+               assertTrue(hashLargeSmall10.compareTo(hashLargeSmallCached10) > 
0);
+               
+               // caching the large side is better, because then the small one 
is the one with additional I/O
+               
assertTrue(hashLargeSmallCached10.compareTo(hashSmallLargeCached10) < 0);
+               
+               // a weight of one makes the caching the same as the non-cached 
variant
+               assertTrue(hashLargeSmall1.compareTo(hashLargeSmallCached1) == 
0);
+       }
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Estimate providers
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static final class UnknownEstimates implements EstimateProvider 
{
+
+               @Override
+               public long getEstimatedOutputSize() { return -1; }
+
+               @Override
+               public long getEstimatedNumRecords() { return -1; }
+
+               @Override
+               public float getEstimatedAvgWidthPerOutputRecord() { return 
-1.0f; }
+       }
+       
+       private static final class Estimates implements EstimateProvider {
+               
+               private final long size;
+               private final long records;
+               private final float width;
+               
+               public Estimates(long size, long records) {
+                       this(size, records, -1.0f);
+               }
+               
+               public Estimates(long size, long records, float width) {
+                       this.size = size;
+                       this.records = records;
+                       this.width = width;
+               }
+
+               @Override
+               public long getEstimatedOutputSize() {
+                       return this.size;
+               }
+
+               @Override
+               public long getEstimatedNumRecords() {
+                       return this.records;
+               }
+
+               @Override
+               public float getEstimatedAvgWidthPerOutputRecord() {
+                       return this.width;
+               }
+       }
+}

Reply via email to