http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
new file mode 100644
index 0000000..e65758f
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class NestedIterationsTest extends CompilerTestBase {
+
+       @Test
+       public void testRejectNestedBulkIterations() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> data = env.generateSequence(1, 100);
+                       
+                       IterativeDataSet<Long> outerIteration = 
data.iterate(100);
+                       
+                       IterativeDataSet<Long> innerIteration = 
outerIteration.map(new IdentityMapper<Long>()).iterate(100);
+                       
+                       DataSet<Long> innerResult = 
innerIteration.closeWith(innerIteration.map(new IdentityMapper<Long>()));
+                       
+                       DataSet<Long> outerResult = 
outerIteration.closeWith(innerResult.map(new IdentityMapper<Long>()));
+                       
+                       outerResult.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       
+                       try {
+                               compileNoStats(p);
+                       }
+                       catch (CompilerException e) {
+                               
assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testRejectNestedWorksetIterations() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Tuple2<Long, Long>> data = env.fromElements(new 
Tuple2<Long, Long>(0L, 0L));
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
outerIteration = data.iterateDelta(data, 100, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> inOuter = 
outerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
innerIteration = inOuter.iterateDelta(inOuter, 100, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> inInner = 
innerIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       
+                       DataSet<Tuple2<Long, Long>> innerResult = 
innerIteration.closeWith(inInner, inInner).map(new 
IdentityMapper<Tuple2<Long,Long>>());
+                       
+                       DataSet<Tuple2<Long, Long>> outerResult = 
outerIteration.closeWith(innerResult, innerResult);
+                       
+                       outerResult.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       
+                       try {
+                               compileNoStats(p);
+                       }
+                       catch (CompilerException e) {
+                               
assertTrue(e.getMessage().toLowerCase().indexOf("nested iterations") != -1);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testBulkIterationInClosure() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Long> data1 = env.generateSequence(1, 100);
+                       DataSet<Long> data2 = env.generateSequence(1, 100);
+                       
+                       IterativeDataSet<Long> firstIteration = 
data1.iterate(100);
+                       
+                       DataSet<Long> firstResult = 
firstIteration.closeWith(firstIteration.map(new IdentityMapper<Long>()));
+                       
+                       
+                       IterativeDataSet<Long> mainIteration = data2.map(new 
IdentityMapper<Long>()).iterate(100);
+                       
+                       DataSet<Long> joined = mainIteration.join(firstResult)
+                                       .where(new 
IdentityKeyExtractor<Long>()).equalTo(new IdentityKeyExtractor<Long>())
+                                       .with(new 
DummyFlatJoinFunction<Long>());
+                       
+                       DataSet<Long> mainResult = 
mainIteration.closeWith(joined);
+                       
+                       mainResult.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       
+                       // optimizer should be able to translate this
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       // job graph generator should be able to translate this
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testDeltaIterationInClosure() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       DataSet<Tuple2<Long, Long>> data1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+                       DataSet<Tuple2<Long, Long>> data2 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
firstIteration = data1.iterateDelta(data1, 100, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> inFirst = 
firstIteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       
+                       DataSet<Tuple2<Long, Long>> firstResult = 
firstIteration.closeWith(inFirst, inFirst).map(new 
IdentityMapper<Tuple2<Long,Long>>());
+                       
+                       
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
mainIteration = data2.iterateDelta(data2, 100, 0);
+                       
+                       DataSet<Tuple2<Long, Long>> joined = 
mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
+                                                       
.projectFirst(0).projectSecond(0);
+                       
+                       DataSet<Tuple2<Long, Long>> mainResult = 
mainIteration.closeWith(joined, joined);
+                       
+                       mainResult.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       
+                       // optimizer should be able to translate this
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       // job graph generator should be able to translate this
+                       new JobGraphGenerator().compileJobGraph(op);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
new file mode 100644
index 0000000..2b42f85
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionPushdownTest extends CompilerTestBase {
+
+       @Test
+       public void testPartitioningNotPushedDown() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> input = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+                       
+                       input
+                               .groupBy(0, 1).sum(2)
+                               .groupBy(0).sum(1)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       
+                       SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode agg2Combiner = 
(SingleInputPlanNode) agg2Reducer.getInput().getSource();
+                       SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) 
agg2Combiner.getInput().getSource();
+                       
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
agg2Reducer.getInput().getShipStrategy());
+                       assertEquals(new FieldList(0), 
agg2Reducer.getInput().getShipStrategyKeys());
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
agg2Combiner.getInput().getShipStrategy());
+                       
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
agg1Reducer.getInput().getShipStrategy());
+                       assertEquals(new FieldList(0, 1), 
agg1Reducer.getInput().getShipStrategyKeys());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPartitioningReused() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       
+                       @SuppressWarnings("unchecked")
+                       DataSet<Tuple3<Long, Long, Long>> input = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+                       
+                       input
+                               .groupBy(0).sum(1)
+                               .groupBy(0, 1).sum(2)
+                               .print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       
+                       SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) 
agg2Reducer.getInput().getSource();
+                       
+                       assertEquals(ShipStrategyType.FORWARD, 
agg2Reducer.getInput().getShipStrategy());
+                       
+                       assertEquals(ShipStrategyType.PARTITION_HASH, 
agg1Reducer.getInput().getShipStrategy());
+                       assertEquals(new FieldList(0), 
agg1Reducer.getInput().getShipStrategyKeys());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
new file mode 100644
index 0000000..16684dc
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -0,0 +1,845 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class PartitioningReusageTest extends CompilerTestBase {
+
+       @Test
+       public void noPreviousPartitioningJoin1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                                       .where(0,1).equalTo(0,1).with(new 
MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+
+       }
+
+       @Test
+       public void noPreviousPartitioningJoin2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(0,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2.partitionByHash(2, 1)
+                                                       .map(new MockMapper())
+                                                       
.withForwardedFields("2;1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseSinglePartitioningJoin5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .join(set2.partitionByHash(2)
+                                                       .map(new MockMapper())
+                                                       
.withForwardedFields("2"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2.partitionByHash(0,1)
+                                                       .map(new MockMapper())
+                                                       
.withForwardedFields("0;1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(0,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+
+       @Test
+       public void reuseBothPartitioningJoin2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .join(set2.partitionByHash(1,2)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1;2"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .join(set2.partitionByHash(2,1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("2;1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,1).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0,2)
+                               .map(new 
MockMapper()).withForwardedFields("0;2")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(2,1).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin6() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(1,2).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+       @Test
+       public void reuseBothPartitioningJoin7() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .join(set2.partitionByHash(1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("1"),
+                                               
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+                               .where(0,2).equalTo(1,2).with(new MockJoin());
+
+               joined.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode join = 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidJoinInputProperties(join);
+       }
+
+
+       @Test
+       public void noPreviousPartitioningCoGroup1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2)
+                               .where(0,1).equalTo(0,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+
+       }
+
+       @Test
+       public void noPreviousPartitioningCoGroup2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2)
+                               .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2)
+                               .where(0,1).equalTo(0,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2)
+                               .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2.partitionByHash(2, 1)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("2;1"))
+                               .where(0,1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .coGroup(set2)
+                               .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseSinglePartitioningCoGroup5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .coGroup(set2.partitionByHash(2)
+                                                               .map(new 
MockMapper())
+                                                               
.withForwardedFields("2"))
+                               .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup1() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2.partitionByHash(0, 1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("0;1"))
+                               .where(0, 1).equalTo(0, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+
+       @Test
+       public void reuseBothPartitioningCoGroup2() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,1)
+                               .map(new 
MockMapper()).withForwardedFields("0;1")
+                               .coGroup(set2.partitionByHash(1, 2)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1;2"))
+                               .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup3() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0)
+                               .map(new MockMapper()).withForwardedFields("0")
+                               .coGroup(set2.partitionByHash(2, 1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("2;1"))
+                               .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup4() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(0,2)
+                               .map(new 
MockMapper()).withForwardedFields("0;2")
+                               .coGroup(set2.partitionByHash(1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1"))
+                               .where(0, 2).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup5() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .coGroup(set2.partitionByHash(1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1"))
+                               .where(0, 2).equalTo(2, 1).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup6() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .coGroup(set2.partitionByHash(2)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("2"))
+                               .where(0, 2).equalTo(1, 2).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+       @Test
+       public void reuseBothPartitioningCoGroup7() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Integer, Integer>> set1 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+               DataSet<Tuple3<Integer, Integer, Integer>> set2 = 
env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+               DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+                               .partitionByHash(2)
+                               .map(new MockMapper()).withForwardedFields("2")
+                               .coGroup(set2.partitionByHash(1)
+                                               .map(new MockMapper())
+                                               .withForwardedFields("1"))
+                               .where(0, 2).equalTo(1, 2).with(new 
MockCoGroup());
+
+               coGrouped.print();
+               JavaPlan plan = env.createProgramPlan();
+               OptimizedPlan oPlan = compileWithStats(plan);
+
+               SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+               DualInputPlanNode coGroup= 
(DualInputPlanNode)sink.getInput().getSource();
+
+               checkValidCoGroupInputProperties(coGroup);
+       }
+
+
+
+       private void checkValidJoinInputProperties(DualInputPlanNode join) {
+
+               GlobalProperties inProps1 = 
join.getInput1().getGlobalProperties();
+               GlobalProperties inProps2 = 
join.getInput2().getGlobalProperties();
+
+               if(inProps1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
+
+                       // check that both inputs are hash partitioned on the 
same fields
+                       FieldList pFields1 = inProps1.getPartitioningFields();
+                       FieldList pFields2 = inProps2.getPartitioningFields();
+
+                       assertTrue("Inputs are not the same number of fields. 
Input 1: "+pFields1+", Input 2: "+pFields2,
+                                       pFields1.size() == pFields2.size());
+
+                       FieldList reqPFields1 = join.getKeysForInput1();
+                       FieldList reqPFields2 = join.getKeysForInput2();
+
+                       for(int i=0; i<pFields1.size(); i++) {
+
+                               // get fields
+                               int f1 = pFields1.get(i);
+                               int f2 = pFields2.get(i);
+
+                               // check that field positions in original key 
field list are identical
+                               int pos1 = getPosInFieldList(f1, reqPFields1);
+                               int pos2 = getPosInFieldList(f2, reqPFields2);
+
+                               if(pos1 < 0) {
+                                       fail("Input 1 is partitioned on field 
"+f1+" which is not contained in the key set "+reqPFields1);
+                               }
+                               if(pos2 < 0) {
+                                       fail("Input 2 is partitioned on field 
"+f2+" which is not contained in the key set "+reqPFields2);
+                               }
+                               if(pos1 != pos2) {
+                                       fail("Inputs are not partitioned on the 
same key fields");
+                               }
+                       }
+
+               }
+               else if(inProps1.getPartitioning() == 
PartitioningProperty.FULL_REPLICATION &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED) {
+                       // we are good. No need to check for fields
+               }
+               else if(inProps1.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.FULL_REPLICATION) {
+                       // we are good. No need to check for fields
+               }
+               else {
+                       throw new UnsupportedOperationException("This method 
has only been implemented to check for hash partitioned coGroupinputs");
+               }
+
+       }
+
+       private void checkValidCoGroupInputProperties(DualInputPlanNode 
coGroup) {
+
+               GlobalProperties inProps1 = 
coGroup.getInput1().getGlobalProperties();
+               GlobalProperties inProps2 = 
coGroup.getInput2().getGlobalProperties();
+
+               if(inProps1.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED &&
+                               inProps2.getPartitioning() == 
PartitioningProperty.HASH_PARTITIONED) {
+
+                       // check that both inputs are hash partitioned on the 
same fields
+                       FieldList pFields1 = inProps1.getPartitioningFields();
+                       FieldList pFields2 = inProps2.getPartitioningFields();
+
+                       assertTrue("Inputs are not the same number of fields. 
Input 1: "+pFields1+", Input 2: "+pFields2,
+                                       pFields1.size() == pFields2.size());
+
+                       FieldList reqPFields1 = coGroup.getKeysForInput1();
+                       FieldList reqPFields2 = coGroup.getKeysForInput2();
+
+                       for(int i=0; i<pFields1.size(); i++) {
+
+                               // get fields
+                               int f1 = pFields1.get(i);
+                               int f2 = pFields2.get(i);
+
+                               // check that field positions in original key 
field list are identical
+                               int pos1 = getPosInFieldList(f1, reqPFields1);
+                               int pos2 = getPosInFieldList(f2, reqPFields2);
+
+                               if(pos1 < 0) {
+                                       fail("Input 1 is partitioned on field 
"+f1+" which is not contained in the key set "+reqPFields1);
+                               }
+                               if(pos2 < 0) {
+                                       fail("Input 2 is partitioned on field 
"+f2+" which is not contained in the key set "+reqPFields2);
+                               }
+                               if(pos1 != pos2) {
+                                       fail("Inputs are not partitioned on the 
same key fields");
+                               }
+                       }
+
+               }
+               else {
+                       throw new UnsupportedOperationException("This method 
has only been implemented to check for hash partitioned coGroup inputs");
+               }
+
+       }
+
+       private int getPosInFieldList(int field, FieldList list) {
+
+               int pos;
+               for(pos=0; pos<list.size(); pos++) {
+                       if(field == list.get(pos)) {
+                               break;
+                       }
+               }
+               if(pos == list.size()) {
+                       return -1;
+               } else {
+                       return pos;
+               }
+
+       }
+
+
+
+       public static class MockMapper implements MapFunction<Tuple3<Integer, 
Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+               @Override
+               public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, 
Integer, Integer> value) throws Exception {
+                       return null;
+               }
+       }
+
+       public static class MockJoin implements JoinFunction<Tuple3<Integer, 
Integer, Integer>,
+                       Tuple3<Integer, Integer, Integer>, Tuple3<Integer, 
Integer, Integer>> {
+
+               @Override
+               public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, 
Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws 
Exception {
+                       return null;
+               }
+       }
+
+       public static class MockCoGroup implements 
CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+                               Tuple3<Integer, Integer, Integer>, 
Tuple3<Integer, Integer, Integer>> {
+
+               @Override
+               public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> 
first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+                                                       
Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+               }
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
new file mode 100644
index 0000000..86f01b0
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.configuration.Configuration;
+
+@SuppressWarnings("serial")
+public class PipelineBreakerTest extends CompilerTestBase {
+
+       @Test
+       public void testPipelineBreakerWithBroadcastVariable() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(64);
+                       
+                       DataSet<Long> source = env.generateSequence(1, 
10).map(new IdentityMapper<Long>());
+                       
+                       DataSet<Long> result = source.map(new 
IdentityMapper<Long>())
+                                                                               
.map(new IdentityMapper<Long>())
+                                                                               
        .withBroadcastSet(source, "bc");
+                       
+                       result.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       
+                       
assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPipelineBreakerBroadcastedAllReduce() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(64);
+                       
+                       DataSet<Long> sourceWithMapper = 
env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+                       
+                       DataSet<Long> bcInput1 = sourceWithMapper
+                                                                               
.map(new IdentityMapper<Long>())
+                                                                               
.reduce(new SelectOneReducer<Long>());
+                       DataSet<Long> bcInput2 = env.generateSequence(1, 10);
+                       
+                       DataSet<Long> result = sourceWithMapper
+                                       .map(new IdentityMapper<Long>())
+                                                       
.withBroadcastSet(bcInput1, "bc1")
+                                                       
.withBroadcastSet(bcInput2, "bc2");
+                       
+                       result.print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
sink.getInput().getSource();
+                       
+                       
assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPipelineBreakerBroadcastedPartialSolution() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.setDegreeOfParallelism(64);
+                       
+                       
+                       DataSet<Long> initialSource = env.generateSequence(1, 
10);
+                       IterativeDataSet<Long> iteration = 
initialSource.iterate(100);
+                       
+                       
+                       DataSet<Long> sourceWithMapper = 
env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+                       
+                       DataSet<Long> bcInput1 = sourceWithMapper
+                                                                               
.map(new IdentityMapper<Long>())
+                                                                               
.reduce(new SelectOneReducer<Long>());
+                       
+                       DataSet<Long> result = sourceWithMapper
+                                       .map(new IdentityMapper<Long>())
+                                                       
.withBroadcastSet(iteration, "bc2")
+                                                       
.withBroadcastSet(bcInput1, "bc1");
+                                                       
+                       
+                       iteration.closeWith(result).print();
+                       
+                       Plan p = env.createProgramPlan();
+                       OptimizedPlan op = compileNoStats(p);
+                       
+                       SinkPlanNode sink = op.getDataSinks().iterator().next();
+                       BulkIterationPlanNode iterationPlanNode = 
(BulkIterationPlanNode) sink.getInput().getSource();
+                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
iterationPlanNode.getRootOfStepFunction();
+                       
+                       
assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
+       @Test
+       public void testPilelineBreakerWithCross() {
+               try {
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               env.setDegreeOfParallelism(64);
+                               
+                               DataSet<Long> initialSource = 
env.generateSequence(1, 10);
+                               
+                               Configuration conf= new Configuration();
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+                               initialSource
+                                       .map(new IdentityMapper<Long>())
+                                       
.cross(initialSource).withParameters(conf)
+                                       .print();
+                               
+                               
+                               Plan p = env.createProgramPlan();
+                               OptimizedPlan op = compileNoStats(p);
+                               SinkPlanNode sink = 
op.getDataSinks().iterator().next();
+                               DualInputPlanNode mapper = (DualInputPlanNode) 
sink.getInput().getSource();
+                               
+                               
assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+                       }
+                       
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               env.setDegreeOfParallelism(64);
+                               
+                               DataSet<Long> initialSource = 
env.generateSequence(1, 10);
+                               
+                               Configuration conf= new Configuration();
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+                               initialSource
+                                       .map(new IdentityMapper<Long>())
+                                       
.cross(initialSource).withParameters(conf)
+                                       .print();
+                               
+                               
+                               Plan p = env.createProgramPlan();
+                               OptimizedPlan op = compileNoStats(p);
+                               
+                               SinkPlanNode sink = 
op.getDataSinks().iterator().next();
+                               DualInputPlanNode mapper = (DualInputPlanNode) 
sink.getInput().getSource();
+                               
+                               
assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+                       }
+                       
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               env.setDegreeOfParallelism(64);
+                               
+                               DataSet<Long> initialSource = 
env.generateSequence(1, 10);
+                               
+                               Configuration conf= new Configuration();
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+                               initialSource
+                                       .map(new IdentityMapper<Long>())
+                                       
.cross(initialSource).withParameters(conf)
+                                       .print();
+                               
+                               
+                               Plan p = env.createProgramPlan();
+                               OptimizedPlan op = compileNoStats(p);
+                               
+                               SinkPlanNode sink = 
op.getDataSinks().iterator().next();
+                               DualInputPlanNode mapper = (DualInputPlanNode) 
sink.getInput().getSource();
+                               
+                               
assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+                       }
+                       
+                       {
+                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               env.setDegreeOfParallelism(64);
+                               
+                               DataSet<Long> initialSource = 
env.generateSequence(1, 10);
+                               
+                               Configuration conf= new Configuration();
+                               conf.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+                               initialSource
+                                       .map(new IdentityMapper<Long>())
+                                       
.cross(initialSource).withParameters(conf)
+                                       .print();
+                               
+                               
+                               Plan p = env.createProgramPlan();
+                               OptimizedPlan op = compileNoStats(p);
+                               
+                               SinkPlanNode sink = 
op.getDataSinks().iterator().next();
+                               DualInputPlanNode mapper = (DualInputPlanNode) 
sink.getInput().getSource();
+                               
+                               
assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
new file mode 100644
index 0000000..7be2b16
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -0,0 +1,897 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"serial"})
+public class PropertyDataSourceTest extends CompilerTestBase {
+
+       private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new 
ArrayList<Tuple3<Long, SomePojo, String>>();
+       private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = 
new TupleTypeInfo<Tuple3<Long, SomePojo, String>>(
+                       BasicTypeInfo.LONG_TYPE_INFO,
+                       TypeExtractor.createTypeInfo(SomePojo.class),
+                       BasicTypeInfo.STRING_TYPE_INFO
+       );
+
+       @Test
+       public void checkSinglePartitionedSource1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1, 0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource3() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("*");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 
2, 3, 4)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource4() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 
3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource5() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.stringField");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource6() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField; f2");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedSource7() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 1, 0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(gprops.getCustomPartitioner() != null);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0)
+                               .splitsGroupedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0)
+                               .splitsGroupedBy(1, 0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedGroupedSource3() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1)
+                               .splitsGroupedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource4() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0, 1)
+                               .splitsGroupedBy(0);
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource5() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f2")
+                               .splitsGroupedBy("f2");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedGroupedSource6() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField")
+                               .splitsGroupedBy("f0; f1.intField");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedGroupedSource7() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField")
+                               .splitsGroupedBy("f1");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedGroupedSource8() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1")
+                               .splitsGroupedBy("f1.stringField");
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new 
FieldSet(1,2,3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedOrderedSource1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1)
+                               .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.ASCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue((new 
FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(1)
+                               .splitsOrderedBy(new int[]{1, 0}, new 
Order[]{Order.ASCENDING, Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue((new 
FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedOrderedSource3() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0)
+                               .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.ASCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource4() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy(0, 1)
+                               .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource5() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                       .splitsPartitionedBy("f1.intField")
+                       .splitsOrderedBy("f0; f1.intField", new 
Order[]{Order.ASCENDING, Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkSinglePartitionedOrderedSource6() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1.intField")
+                               .splitsOrderedBy("f1", new 
Order[]{Order.DESCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(new 
FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3)));
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+       @Test
+       public void checkSinglePartitionedOrderedSource7() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple3<Long, SomePojo, String>> data = 
env.fromCollection(tuple3PojoData, tuple3PojoType);
+
+               data.getSplitDataProperties()
+                               .splitsPartitionedBy("f1")
+                               .splitsOrderedBy("f1.stringField", new 
Order[]{Order.ASCENDING});
+
+               data.print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode = (SourcePlanNode) 
sinkNode.getPredecessor();
+
+               GlobalProperties gprops = sourceNode.getGlobalProperties();
+               LocalProperties lprops = sourceNode.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops.getPartitioningFields().toArray())).equals(new 
FieldSet(1,2,3)));
+               Assert.assertTrue(gprops.getPartitioning() == 
PartitioningProperty.ANY_PARTITIONING);
+               Assert.assertTrue(lprops.getGroupedFields() == null);
+               Assert.assertTrue(lprops.getOrdering() == null);
+
+       }
+
+
+       @Test
+       public void checkCoPartitionedSources1() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data1 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data1.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 0);
+
+               DataSource<Tuple2<Long, String>> data2 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data2.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 0);
+
+               data1.union(data2).print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode1 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+               SourcePlanNode sourceNode2 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+               GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+               LocalProperties lprops1 = sourceNode1.getLocalProperties();
+               GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+               LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops1.getGroupedFields() == null);
+               Assert.assertTrue(lprops1.getOrdering() == null);
+
+               Assert.assertTrue((new 
FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops2.getGroupedFields() == null);
+               Assert.assertTrue(lprops2.getOrdering() == null);
+
+               
Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+       }
+
+       @Test
+       public void checkCoPartitionedSources2() {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+               env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+               DataSource<Tuple2<Long, String>> data1 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data1.getSplitDataProperties()
+                               .splitsPartitionedBy("byCountry", 0);
+
+               DataSource<Tuple2<Long, String>> data2 =
+                               env.readCsvFile("/some/path").types(Long.class, 
String.class);
+
+               data2.getSplitDataProperties()
+                               .splitsPartitionedBy("byDate", 0);
+
+               data1.union(data2).print();
+
+               JavaPlan plan = env.createProgramPlan();
+
+               // submit the plan to the compiler
+               OptimizedPlan oPlan = compileNoStats(plan);
+
+               // check the optimized Plan
+               SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+               SourcePlanNode sourceNode1 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource();
+               SourcePlanNode sourceNode2 = (SourcePlanNode) 
((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource();
+
+               GlobalProperties gprops1 = sourceNode1.getGlobalProperties();
+               LocalProperties lprops1 = sourceNode1.getLocalProperties();
+               GlobalProperties gprops2 = sourceNode2.getGlobalProperties();
+               LocalProperties lprops2 = sourceNode2.getLocalProperties();
+
+               Assert.assertTrue((new 
FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops1.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops1.getGroupedFields() == null);
+               Assert.assertTrue(lprops1.getOrdering() == null);
+
+               Assert.assertTrue((new 
FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0)));
+               Assert.assertTrue(gprops2.getPartitioning() == 
PartitioningProperty.CUSTOM_PARTITIONING);
+               Assert.assertTrue(lprops2.getGroupedFields() == null);
+               Assert.assertTrue(lprops2.getOrdering() == null);
+
+               
Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner()));
+       }
+
+
+       public static class SomePojo {
+               public double doubleField;
+               public int intField;
+               public String stringField;
+       }
+
+}
+
+

Reply via email to