http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
deleted file mode 100644
index 0273659..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial","unchecked"})
-public class BinaryCustomPartitioningCompatibilityTest extends 
CompilerTestBase {
-
-       @Test
-       public void testCompatiblePartitioningJoin() {
-               try {
-                       final Partitioner<Long> partitioner = new 
Partitioner<Long>() {
-                               @Override
-                               public int partition(Long key, int 
numPartitions) {
-                                       return 0;
-                               }
-                       };
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       input1.partitionCustom(partitioner, 1)
-                               .join(input2.partitionCustom(partitioner, 0))
-                               .where(1).equalTo(0)
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode partitioner1 = 
(SingleInputPlanNode) join.getInput1().getSource();
-                       SingleInputPlanNode partitioner2 = 
(SingleInputPlanNode) join.getInput2().getSource();
-
-                       assertEquals(ShipStrategyType.FORWARD, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
join.getInput2().getShipStrategy());
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner1.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner2.getInput().getShipStrategy());
-                       assertEquals(partitioner, 
partitioner1.getInput().getPartitioner());
-                       assertEquals(partitioner, 
partitioner2.getInput().getPartitioner());
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCompatiblePartitioningCoGroup() {
-               try {
-                       final Partitioner<Long> partitioner = new 
Partitioner<Long>() {
-                               @Override
-                               public int partition(Long key, int 
numPartitions) {
-                                       return 0;
-                               }
-                       };
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       input1.partitionCustom(partitioner, 1)
-                               .coGroup(input2.partitionCustom(partitioner, 0))
-                               .where(1).equalTo(0)
-                               .with(new DummyCoGroupFunction<Tuple2<Long, 
Long>, Tuple3<Long, Long, Long>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode coGroup = (DualInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode partitioner1 = 
(SingleInputPlanNode) coGroup.getInput1().getSource();
-                       SingleInputPlanNode partitioner2 = 
(SingleInputPlanNode) coGroup.getInput2().getSource();
-
-                       assertEquals(ShipStrategyType.FORWARD, 
coGroup.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
coGroup.getInput2().getShipStrategy());
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner1.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner2.getInput().getShipStrategy());
-                       assertEquals(partitioner, 
partitioner1.getInput().getPartitioner());
-                       assertEquals(partitioner, 
partitioner2.getInput().getPartitioner());
-                       
-                       new JobGraphGenerator().compileJobGraph(op);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
deleted file mode 100644
index 08f7388..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CoGroupCustomPartitioningTest extends CompilerTestBase {
-
-       @Test
-       public void testCoGroupWithTuples() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       input1
-                               .coGroup(input2)
-                               .where(1).equalTo(0)
-                               .withPartitioner(partitioner)
-                               .with(new DummyCoGroupFunction<Tuple2<Long, 
Long>, Tuple3<Long, Long, Long>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCoGroupWithTuplesWrongType() {
-               try {
-                       final Partitioner<Integer> partitioner = new 
TestPartitionerInt();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       try {
-                               input1
-                                       .coGroup(input2)
-                                       .where(1).equalTo(0)
-                                       .withPartitioner(partitioner);
-                               fail("should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCoGroupWithPojos() {
-               try {
-                       final Partitioner<Integer> partitioner = new 
TestPartitionerInt();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       input1
-                               .coGroup(input2)
-                               .where("b").equalTo("a")
-                               .withPartitioner(partitioner)
-                               .with(new DummyCoGroupFunction<Pojo2, Pojo3>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCoGroupWithPojosWrongType() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       try {
-                               input1
-                                       .coGroup(input2)
-                                       .where("a").equalTo("b")
-                                       .withPartitioner(partitioner);
-                               
-                               fail("should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCoGroupWithKeySelectors() {
-               try {
-                       final Partitioner<Integer> partitioner = new 
TestPartitionerInt();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       input1
-                               .coGroup(input2)
-                               .where(new Pojo2KeySelector()).equalTo(new 
Pojo3KeySelector())
-                               .withPartitioner(partitioner)
-                               .with(new DummyCoGroupFunction<Pojo2, Pojo3>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCoGroupWithKeySelectorsWrongType() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       try {
-                               input1
-                                       .coGroup(input2)
-                                       .where(new 
Pojo2KeySelector()).equalTo(new Pojo3KeySelector())
-                                       .withPartitioner(partitioner);
-                               
-                               fail("should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testIncompatibleHashAndCustomPartitioning() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Long, Long, Long>> input = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       DataSet<Tuple3<Long, Long, Long>> partitioned = input
-                               .partitionCustom(new Partitioner<Long>() {
-                                       @Override
-                                       public int partition(Long key, int 
numPartitions) { return 0; }
-                               }, 0)
-                               .map(new 
IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
-                               
-                       
-                       DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
-                               .distinct(0, 1)
-                               .groupBy(1)
-                               .sortGroup(0, Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
-                       
-                       grouped
-                               .coGroup(partitioned).where(0).equalTo(0)
-                               .with(new 
DummyCoGroupFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode coGroup = (DualInputPlanNode) 
sink.getInput().getSource();
-
-                       assertEquals(ShipStrategyType.PARTITION_HASH, 
coGroup.getInput1().getShipStrategy());
-                       assertTrue(coGroup.getInput2().getShipStrategy() == 
ShipStrategyType.PARTITION_HASH || 
-                                               
coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static class TestPartitionerInt implements Partitioner<Integer> 
{
-               @Override
-               public int partition(Integer key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       public static class Pojo2 {
-               public int a;
-               public int b;
-       }
-       
-       public static class Pojo3 {
-               public int a;
-               public int b;
-               public int c;
-       }
-       
-       private static class Pojo2KeySelector implements KeySelector<Pojo2, 
Integer> {
-               @Override
-               public Integer getKey(Pojo2 value) {
-                       return value.a;
-               }
-       }
-       
-       private static class Pojo3KeySelector implements KeySelector<Pojo3, 
Integer> {
-               @Override
-               public Integer getKey(Pojo3 value) {
-                       return value.b;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
deleted file mode 100644
index 9fd676f..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase 
{
-
-       @Test
-       public void testJoinReduceCombination() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-
-                       DataSet<Tuple3<Long, Long, Long>> joined = 
input1.join(input2)
-                               .where(1).equalTo(0)
-                               .projectFirst(0, 1)
-                               .<Tuple3<Long, Long, Long>>projectSecond(2)
-                               .withPartitioner(partitioner);
-
-                       joined.groupBy(1).withPartitioner(partitioner)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-                               .print();
-
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertTrue("Reduce is not chained, property reuse does 
not happen", 
-                                       reducer.getInput().getSource() 
instanceof DualInputPlanNode);
-                       
-                       DualInputPlanNode join = (DualInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
reducer.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
deleted file mode 100644
index d397ea2..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityPartitionerMapper;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class CustomPartitioningTest extends CompilerTestBase {
-
-       @Test
-       public void testPartitionTuples() {
-               try {
-                       final Partitioner<Integer> part = new 
TestPartitionerInt();
-                       final int parallelism = 4;
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(parallelism);
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer,Integer>(0, 0))
-                                       .rebalance();
-                       
-                       data
-                               .partitionCustom(part, 0)
-                               .mapPartition(new 
IdentityPartitionerMapper<Tuple2<Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode partitioner = (SingleInputPlanNode) 
mapper.getInput().getSource();
-                       SingleInputPlanNode balancer = (SingleInputPlanNode) 
partitioner.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(parallelism, sink.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-                       assertEquals(parallelism, mapper.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
-                       assertEquals(part, 
partitioner.getInput().getPartitioner());
-                       assertEquals(parallelism, partitioner.getParallelism());
-                       
-                       
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput().getShipStrategy());
-                       assertEquals(parallelism, balancer.getParallelism());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPartitionTuplesInvalidType() {
-               try {
-                       final int parallelism = 4;
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(parallelism);
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer,Integer>(0, 0))
-                                       .rebalance();
-                       
-                       try {
-                               data
-                                       .partitionCustom(new 
TestPartitionerLong(), 0);
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPartitionPojo() {
-               try {
-                       final Partitioner<Integer> part = new 
TestPartitionerInt();
-                       final int parallelism = 4;
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(parallelism);
-                       
-                       DataSet<Pojo> data = env.fromElements(new Pojo())
-                                       .rebalance();
-                       
-                       data
-                               .partitionCustom(part, "a")
-                               .mapPartition(new 
IdentityPartitionerMapper<Pojo>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode partitioner = (SingleInputPlanNode) 
mapper.getInput().getSource();
-                       SingleInputPlanNode balancer = (SingleInputPlanNode) 
partitioner.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(parallelism, sink.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-                       assertEquals(parallelism, mapper.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
-                       assertEquals(part, 
partitioner.getInput().getPartitioner());
-                       assertEquals(parallelism, partitioner.getParallelism());
-                       
-                       
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput().getShipStrategy());
-                       assertEquals(parallelism, balancer.getParallelism());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPartitionPojoInvalidType() {
-               try {
-                       final int parallelism = 4;
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(parallelism);
-                       
-                       DataSet<Pojo> data = env.fromElements(new Pojo())
-                                       .rebalance();
-                       
-                       try {
-                               data
-                                       .partitionCustom(new 
TestPartitionerLong(), "a");
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPartitionKeySelector() {
-               try {
-                       final Partitioner<Integer> part = new 
TestPartitionerInt();
-                       final int parallelism = 4;
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(parallelism);
-                       
-                       DataSet<Pojo> data = env.fromElements(new Pojo())
-                                       .rebalance();
-                       
-                       data
-                               .partitionCustom(part, new 
TestKeySelectorInt<Pojo>())
-                               .mapPartition(new 
IdentityPartitionerMapper<Pojo>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode mapper = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode keyRemover = (SingleInputPlanNode) 
mapper.getInput().getSource();
-                       SingleInputPlanNode partitioner = (SingleInputPlanNode) 
keyRemover.getInput().getSource();
-                       SingleInputPlanNode keyExtractor = 
(SingleInputPlanNode) partitioner.getInput().getSource();
-                       SingleInputPlanNode balancer = (SingleInputPlanNode) 
keyExtractor.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(parallelism, sink.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-                       assertEquals(parallelism, mapper.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
keyRemover.getInput().getShipStrategy());
-                       assertEquals(parallelism, keyRemover.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput().getShipStrategy());
-                       assertEquals(part, 
partitioner.getInput().getPartitioner());
-                       assertEquals(parallelism, partitioner.getParallelism());
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
keyExtractor.getInput().getShipStrategy());
-                       assertEquals(parallelism, 
keyExtractor.getParallelism());
-                       
-                       
assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput().getShipStrategy());
-                       assertEquals(parallelism, balancer.getParallelism());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testPartitionKeySelectorInvalidType() {
-               try {
-                       final Partitioner<Integer> part = 
(Partitioner<Integer>) (Partitioner<?>) new TestPartitionerLong();
-                       final int parallelism = 4;
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.setDegreeOfParallelism(parallelism);
-                       
-                       DataSet<Pojo> data = env.fromElements(new Pojo())
-                                       .rebalance();
-                       
-                       try {
-                               data
-                                       .partitionCustom(part, new 
TestKeySelectorInt<Pojo>());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static class Pojo {
-               public int a;
-               public int b;
-       }
-       
-       private static class TestPartitionerInt implements Partitioner<Integer> 
{
-               @Override
-               public int partition(Integer key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestKeySelectorInt<T> implements KeySelector<T, 
Integer> {
-               @Override
-               public Integer getKey(T value) {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
deleted file mode 100644
index 360487b..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class GroupingKeySelectorTranslationTest extends CompilerTestBase {
-       
-       @Test
-       public void testCustomPartitioningKeySelectorReduce() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(new 
TestKeySelector<Tuple2<Integer,Integer>>())
-                               .withPartitioner(new TestPartitionerInt())
-                               .reduce(new 
DummyReducer<Tuple2<Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode keyRemovingMapper = 
(SingleInputPlanNode) sink.getInput().getSource();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
keyRemovingMapper.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
keyRemovingMapper.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningKeySelectorGroupReduce() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(new 
TestKeySelector<Tuple2<Integer,Integer>>())
-                               .withPartitioner(new TestPartitionerInt())
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple2<Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningKeySelectorGroupReduceSorted() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Integer, Integer, Integer>> data = 
env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(new 
TestKeySelector<Tuple3<Integer,Integer,Integer>>())
-                               .withPartitioner(new TestPartitionerInt())
-                               .sortGroup(new TestKeySelector<Tuple3<Integer, 
Integer, Integer>>(), Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningKeySelectorInvalidType() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data
-                                       .groupBy(new 
TestKeySelector<Tuple2<Integer,Integer>>())
-                                       .withPartitioner(new 
TestPartitionerLong());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningKeySelectorInvalidTypeSorted() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Integer, Integer, Integer>> data = 
env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data
-                                       .groupBy(new 
TestKeySelector<Tuple3<Integer,Integer,Integer>>())
-                                       .sortGroup(1, Order.ASCENDING)
-                                       .withPartitioner(new 
TestPartitionerLong());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleRejectCompositeKey() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Integer, Integer, Integer>> data = 
env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data
-                                       .groupBy(new 
TestBinaryKeySelector<Tuple3<Integer,Integer,Integer>>())
-                                       .withPartitioner(new 
TestPartitionerInt());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static class TestPartitionerInt implements Partitioner<Integer> 
{
-               @Override
-               public int partition(Integer key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestKeySelector<T extends Tuple> implements 
KeySelector<T, Integer> {
-               @Override
-               public Integer getKey(T value) {
-                       return value.getField(0);
-               }
-       }
-       
-       private static class TestBinaryKeySelector<T extends Tuple> implements 
KeySelector<T, Tuple2<Integer, Integer>> {
-               @Override
-               public Tuple2<Integer, Integer> getKey(T value) {
-                       return new Tuple2<Integer, 
Integer>(value.<Integer>getField(0), value.<Integer>getField(1));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
deleted file mode 100644
index 8cd4809..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class GroupingPojoTranslationTest extends CompilerTestBase {
-       
-       @Test
-       public void testCustomPartitioningTupleReduce() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> data = env.fromElements(new Pojo2())
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
-                               .reduce(new DummyReducer<Pojo2>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleGroupReduce() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> data = env.fromElements(new Pojo2())
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
-                               .reduceGroup(new IdentityGroupReducer<Pojo2>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleGroupReduceSorted() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo3> data = env.fromElements(new Pojo3())
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
-                               .sortGroup("b", Order.ASCENDING)
-                               .reduceGroup(new IdentityGroupReducer<Pojo3>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleGroupReduceSorted2() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo4> data = env.fromElements(new Pojo4())
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
-                               .sortGroup("b", Order.ASCENDING)
-                               .sortGroup("c", Order.DESCENDING)
-                               .reduceGroup(new IdentityGroupReducer<Pojo4>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleInvalidType() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> data = env.fromElements(new Pojo2())
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data.groupBy("a").withPartitioner(new 
TestPartitionerLong());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleInvalidTypeSorted() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo3> data = env.fromElements(new Pojo3())
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data.groupBy("a")
-                                       .sortGroup("b", Order.ASCENDING)
-                                       .withPartitioner(new 
TestPartitionerLong());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleRejectCompositeKey() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> data = env.fromElements(new Pojo2())
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data.groupBy("a", "b")
-                                       .withPartitioner(new 
TestPartitionerInt());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static class Pojo2 {
-               public int a;
-               public int b;
-               
-       }
-       
-       public static class Pojo3 {
-               public int a;
-               public int b;
-               public int c;
-       }
-       
-       public static class Pojo4 {
-               public int a;
-               public int b;
-               public int c;
-               public int d;
-       }
-       
-       private static class TestPartitionerInt implements Partitioner<Integer> 
{
-               @Override
-               public int partition(Integer key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
deleted file mode 100644
index 779b8e5..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class GroupingTupleTranslationTest extends CompilerTestBase {
-       
-       @Test
-       public void testCustomPartitioningTupleAgg() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
-                               .sum(1)
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleReduce() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
-                               .reduce(new 
DummyReducer<Tuple2<Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleGroupReduce() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple2<Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleGroupReduceSorted() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Integer, Integer, Integer>> data = 
env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
-                               .sortGroup(1, Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleGroupReduceSorted2() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple4<Integer,Integer,Integer, Integer>> data 
= env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
-                               .sortGroup(1, Order.ASCENDING)
-                               .sortGroup(2, Order.DESCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       SingleInputPlanNode reducer = (SingleInputPlanNode) 
sink.getInput().getSource();
-                       SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
reducer.getInput().getShipStrategy());
-                       assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleInvalidType() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Integer, Integer>> data = 
env.fromElements(new Tuple2<Integer, Integer>(0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data.groupBy(0).withPartitioner(new 
TestPartitionerLong());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleInvalidTypeSorted() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Integer, Integer, Integer>> data = 
env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data.groupBy(0)
-                                       .sortGroup(1, Order.ASCENDING)
-                                       .withPartitioner(new 
TestPartitionerLong());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCustomPartitioningTupleRejectCompositeKey() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Integer, Integer, Integer>> data = 
env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
-                                       .rebalance().setParallelism(4);
-                       
-                       try {
-                               data.groupBy(0, 1)
-                                       .withPartitioner(new 
TestPartitionerInt());
-                               fail("Should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {}
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static class TestPartitionerInt implements Partitioner<Integer> 
{
-               @Override
-               public int partition(Integer key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
deleted file mode 100644
index eae40cf..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.custompartition;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "unchecked"})
-public class JoinCustomPartitioningTest extends CompilerTestBase {
-
-       @Test
-       public void testJoinWithTuples() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       input1
-                               .join(input2, 
JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner)
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testJoinWithTuplesWrongType() {
-               try {
-                       final Partitioner<Integer> partitioner = new 
TestPartitionerInt();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple2<Long, Long>> input1 = 
env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-                       DataSet<Tuple3<Long, Long, Long>> input2 = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       try {
-                               input1
-                                       .join(input2, 
JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0)
-                                       .withPartitioner(partitioner);
-                               
-                               fail("should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testJoinWithPojos() {
-               try {
-                       final Partitioner<Integer> partitioner = new 
TestPartitionerInt();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       input1
-                               .join(input2, JoinHint.REPARTITION_HASH_FIRST)
-                               
.where("b").equalTo("a").withPartitioner(partitioner)
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testJoinWithPojosWrongType() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       try {
-                               input1
-                                       .join(input2, 
JoinHint.REPARTITION_HASH_FIRST)
-                                       .where("a").equalTo("b")
-                                       .withPartitioner(partitioner);
-                               
-                               fail("should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testJoinWithKeySelectors() {
-               try {
-                       final Partitioner<Integer> partitioner = new 
TestPartitionerInt();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       input1
-                               .join(input2, JoinHint.REPARTITION_HASH_FIRST)
-                               .where(new Pojo2KeySelector())
-                               .equalTo(new Pojo3KeySelector())
-                               .withPartitioner(partitioner)
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode join = (DualInputPlanNode) 
sink.getInput().getSource();
-                       
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput1().getShipStrategy());
-                       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
join.getInput2().getShipStrategy());
-                       assertEquals(partitioner, 
join.getInput1().getPartitioner());
-                       assertEquals(partitioner, 
join.getInput2().getPartitioner());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testJoinWithKeySelectorsWrongType() {
-               try {
-                       final Partitioner<Long> partitioner = new 
TestPartitionerLong();
-                       
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Pojo2> input1 = env.fromElements(new Pojo2());
-                       DataSet<Pojo3> input2 = env.fromElements(new Pojo3());
-                       
-                       try {
-                               input1
-                                       .join(input2, 
JoinHint.REPARTITION_HASH_FIRST)
-                                       .where(new Pojo2KeySelector())
-                                       .equalTo(new Pojo3KeySelector())
-                                       .withPartitioner(partitioner);
-                               
-                               fail("should throw an exception");
-                       }
-                       catch (InvalidProgramException e) {
-                               // expected
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testIncompatibleHashAndCustomPartitioning() {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       
-                       DataSet<Tuple3<Long, Long, Long>> input = 
env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-                       
-                       DataSet<Tuple3<Long, Long, Long>> partitioned = input
-                               .partitionCustom(new Partitioner<Long>() {
-                                       @Override
-                                       public int partition(Long key, int 
numPartitions) { return 0; }
-                               }, 0)
-                               .map(new 
IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2");
-                               
-                       
-                       DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
-                               .distinct(0, 1)
-                               .groupBy(1)
-                               .sortGroup(0, Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
-                       
-                       grouped
-                               .join(partitioned, 
JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
-                               .with(new 
DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
-                               .print();
-                       
-                       Plan p = env.createProgramPlan();
-                       OptimizedPlan op = compileNoStats(p);
-                       
-                       SinkPlanNode sink = op.getDataSinks().iterator().next();
-                       DualInputPlanNode coGroup = (DualInputPlanNode) 
sink.getInput().getSource();
-
-                       assertEquals(ShipStrategyType.PARTITION_HASH, 
coGroup.getInput1().getShipStrategy());
-                       assertTrue(coGroup.getInput2().getShipStrategy() == 
ShipStrategyType.PARTITION_HASH || 
-                                               
coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static class TestPartitionerInt implements Partitioner<Integer> 
{
-               @Override
-               public int partition(Integer key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       private static class TestPartitionerLong implements Partitioner<Long> {
-               @Override
-               public int partition(Long key, int numPartitions) {
-                       return 0;
-               }
-       }
-       
-       public static class Pojo2 {
-               public int a;
-               public int b;
-       }
-       
-       public static class Pojo3 {
-               public int a;
-               public int b;
-               public int c;
-       }
-       
-       private static class Pojo2KeySelector implements KeySelector<Pojo2, 
Integer> {
-               @Override
-               public Integer getKey(Pojo2 value) {
-                       return value.a;
-               }
-       }
-       
-       private static class Pojo3KeySelector implements KeySelector<Pojo3, 
Integer> {
-               @Override
-               public Integer getKey(Pojo3 value) {
-                       return value.b;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
deleted file mode 100644
index cb4bd78..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dataexchange;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.junit.Test;
-
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- * This test checks the correct assignment of the DataExchangeMode to
- * connections for programs that branch, and re-join those branches.
- *
- * <pre>
- *                                         /-> (sink)
- *                                        /
- *                         /-> (reduce) -+          /-> (flatmap) -> (sink)
- *                        /               \        /
- *     (source) -> (map) -                (join) -+-----\
- *                        \               /              \
- *                         \-> (filter) -+                \
- *                                       \                (co group) -> (sink)
- *                                        \                /
- *                                         \-> (reduce) - /
- * </pre>
- */
-@SuppressWarnings("serial")
-public class DataExchangeModeClosedBranchingTest extends CompilerTestBase {
-
-       @Test
-       public void testPipelinedForced() {
-               // PIPELINED_FORCED should result in pipelining all the way
-               verifyBranchingJoiningPlan(ExecutionMode.PIPELINED_FORCED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED,
-                               DataExchangeMode.PIPELINED, 
DataExchangeMode.PIPELINED);
-       }
-
-       @Test
-       public void testPipelined() {
-               // PIPELINED should result in pipelining all the way
-               verifyBranchingJoiningPlan(ExecutionMode.PIPELINED,
-                               DataExchangeMode.PIPELINED,   // to map
-                               DataExchangeMode.PIPELINED,   // to combiner 
connections are pipelined
-                               DataExchangeMode.BATCH,       // to reduce
-                               DataExchangeMode.BATCH,       // to filter
-                               DataExchangeMode.PIPELINED,   // to sink after 
reduce
-                               DataExchangeMode.PIPELINED,   // to join (first 
input)
-                               DataExchangeMode.BATCH,       // to join 
(second input)
-                               DataExchangeMode.PIPELINED,   // combiner 
connections are pipelined
-                               DataExchangeMode.BATCH,       // to other 
reducer
-                               DataExchangeMode.PIPELINED,   // to flatMap
-                               DataExchangeMode.PIPELINED,   // to sink after 
flatMap
-                               DataExchangeMode.PIPELINED,   // to coGroup 
(first input)
-                               DataExchangeMode.PIPELINED,   // to coGroup 
(second input)
-                               DataExchangeMode.PIPELINED    // to sink after 
coGroup
-               );
-       }
-
-       @Test
-       public void testBatch() {
-               // BATCH should result in batching the shuffle all the way
-               verifyBranchingJoiningPlan(ExecutionMode.BATCH,
-                               DataExchangeMode.PIPELINED,   // to map
-                               DataExchangeMode.PIPELINED,   // to combiner 
connections are pipelined
-                               DataExchangeMode.BATCH,       // to reduce
-                               DataExchangeMode.BATCH,       // to filter
-                               DataExchangeMode.PIPELINED,   // to sink after 
reduce
-                               DataExchangeMode.BATCH,       // to join (first 
input)
-                               DataExchangeMode.BATCH,       // to join 
(second input)
-                               DataExchangeMode.PIPELINED,   // combiner 
connections are pipelined
-                               DataExchangeMode.BATCH,       // to other 
reducer
-                               DataExchangeMode.PIPELINED,   // to flatMap
-                               DataExchangeMode.PIPELINED,   // to sink after 
flatMap
-                               DataExchangeMode.BATCH,       // to coGroup 
(first input)
-                               DataExchangeMode.BATCH,       // to coGroup 
(second input)
-                               DataExchangeMode.PIPELINED    // to sink after 
coGroup
-               );
-       }
-
-       @Test
-       public void testBatchForced() {
-               // BATCH_FORCED should result in batching all the way
-               verifyBranchingJoiningPlan(ExecutionMode.BATCH_FORCED,
-                               DataExchangeMode.BATCH,       // to map
-                               DataExchangeMode.PIPELINED,   // to combiner 
connections are pipelined
-                               DataExchangeMode.BATCH,       // to reduce
-                               DataExchangeMode.BATCH,       // to filter
-                               DataExchangeMode.BATCH,       // to sink after 
reduce
-                               DataExchangeMode.BATCH,       // to join (first 
input)
-                               DataExchangeMode.BATCH,       // to join 
(second input)
-                               DataExchangeMode.PIPELINED,   // combiner 
connections are pipelined
-                               DataExchangeMode.BATCH,       // to other 
reducer
-                               DataExchangeMode.BATCH,       // to flatMap
-                               DataExchangeMode.BATCH,       // to sink after 
flatMap
-                               DataExchangeMode.BATCH,       // to coGroup 
(first input)
-                               DataExchangeMode.BATCH,       // to coGroup 
(second input)
-                               DataExchangeMode.BATCH        // to sink after 
coGroup
-               );
-       }
-
-       private void verifyBranchingJoiningPlan(ExecutionMode execMode,
-                                                                               
        DataExchangeMode toMap,
-                                                                               
        DataExchangeMode toReduceCombiner,
-                                                                               
        DataExchangeMode toReduce,
-                                                                               
        DataExchangeMode toFilter,
-                                                                               
        DataExchangeMode toReduceSink,
-                                                                               
        DataExchangeMode toJoin1,
-                                                                               
        DataExchangeMode toJoin2,
-                                                                               
        DataExchangeMode toOtherReduceCombiner,
-                                                                               
        DataExchangeMode toOtherReduce,
-                                                                               
        DataExchangeMode toFlatMap,
-                                                                               
        DataExchangeMode toFlatMapSink,
-                                                                               
        DataExchangeMode toCoGroup1,
-                                                                               
        DataExchangeMode toCoGroup2,
-                                                                               
        DataExchangeMode toCoGroupSink)
-       {
-               try {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.getConfig().setExecutionMode(execMode);
-
-                       DataSet<Tuple2<Long, Long>> data = 
env.fromElements(33L, 44L)
-                                       .map(new MapFunction<Long, Tuple2<Long, 
Long>>() {
-                                               @Override
-                                               public Tuple2<Long, Long> 
map(Long value) {
-                                                       return new Tuple2<Long, 
Long>(value, value);
-                                               }
-                                       });
-
-                       DataSet<Tuple2<Long, Long>> reduced = 
data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
-                       reduced.output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>()).name("reduceSink");
-
-                       DataSet<Tuple2<Long, Long>> filtered = data.filter(new 
FilterFunction<Tuple2<Long, Long>>() {
-                               @Override
-                               public boolean filter(Tuple2<Long, Long> value) 
throws Exception {
-                                       return false;
-                               }
-                       });
-
-                       DataSet<Tuple2<Long, Long>> joined = 
reduced.join(filtered)
-                                       .where(1).equalTo(1)
-                                       .with(new 
DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-                       joined.flatMap(new IdentityFlatMapper<Tuple2<Long, 
Long>>())
-                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>()).name("flatMapSink");
-
-                       joined.coGroup(filtered.groupBy(1).reduceGroup(new 
Top1GroupReducer<Tuple2<Long, Long>>()))
-                                       .where(0).equalTo(0)
-                                       .with(new 
DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
-                                       .output(new 
DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, 
Long>>>()).name("cgSink");
-
-
-                       OptimizedPlan optPlan = 
compileNoStats(env.createProgramPlan());
-
-                       SinkPlanNode reduceSink = 
findSink(optPlan.getDataSinks(), "reduceSink");
-                       SinkPlanNode flatMapSink = 
findSink(optPlan.getDataSinks(), "flatMapSink");
-                       SinkPlanNode cgSink = findSink(optPlan.getDataSinks(), 
"cgSink");
-
-                       DualInputPlanNode coGroupNode = (DualInputPlanNode) 
cgSink.getPredecessor();
-
-                       DualInputPlanNode joinNode = (DualInputPlanNode) 
coGroupNode.getInput1().getSource();
-                       SingleInputPlanNode otherReduceNode = 
(SingleInputPlanNode) coGroupNode.getInput2().getSource();
-                       SingleInputPlanNode otherReduceCombinerNode = 
(SingleInputPlanNode) otherReduceNode.getPredecessor();
-
-                       SingleInputPlanNode reduceNode = (SingleInputPlanNode) 
joinNode.getInput1().getSource();
-                       SingleInputPlanNode reduceCombinerNode = 
(SingleInputPlanNode) reduceNode.getPredecessor();
-                       assertEquals(reduceNode, reduceSink.getPredecessor());
-
-                       SingleInputPlanNode filterNode = (SingleInputPlanNode) 
joinNode.getInput2().getSource();
-                       assertEquals(filterNode, 
otherReduceCombinerNode.getPredecessor());
-
-                       SingleInputPlanNode mapNode = (SingleInputPlanNode) 
filterNode.getPredecessor();
-                       assertEquals(mapNode, 
reduceCombinerNode.getPredecessor());
-
-                       SingleInputPlanNode flatMapNode = (SingleInputPlanNode) 
flatMapSink.getPredecessor();
-                       assertEquals(joinNode, flatMapNode.getPredecessor());
-
-                       // verify the data exchange modes
-
-                       assertEquals(toReduceSink, 
reduceSink.getInput().getDataExchangeMode());
-                       assertEquals(toFlatMapSink, 
flatMapSink.getInput().getDataExchangeMode());
-                       assertEquals(toCoGroupSink, 
cgSink.getInput().getDataExchangeMode());
-
-                       assertEquals(toCoGroup1, 
coGroupNode.getInput1().getDataExchangeMode());
-                       assertEquals(toCoGroup2, 
coGroupNode.getInput2().getDataExchangeMode());
-
-                       assertEquals(toJoin1, 
joinNode.getInput1().getDataExchangeMode());
-                       assertEquals(toJoin2, 
joinNode.getInput2().getDataExchangeMode());
-
-                       assertEquals(toOtherReduce, 
otherReduceNode.getInput().getDataExchangeMode());
-                       assertEquals(toOtherReduceCombiner, 
otherReduceCombinerNode.getInput().getDataExchangeMode());
-
-                       assertEquals(toFlatMap, 
flatMapNode.getInput().getDataExchangeMode());
-
-                       assertEquals(toFilter, 
filterNode.getInput().getDataExchangeMode());
-                       assertEquals(toReduce, 
reduceNode.getInput().getDataExchangeMode());
-                       assertEquals(toReduceCombiner, 
reduceCombinerNode.getInput().getDataExchangeMode());
-
-                       assertEquals(toMap, 
mapNode.getInput().getDataExchangeMode());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       private SinkPlanNode findSink(Collection<SinkPlanNode> collection, 
String name) {
-               for (SinkPlanNode node : collection) {
-                       String nodeName = 
node.getOptimizerNode().getOperator().getName();
-                       if (nodeName != null && nodeName.equals(name)) {
-                               return node;
-                       }
-               }
-
-               throw new IllegalArgumentException("No node with that name was 
found.");
-       }
-}

Reply via email to