http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index f42eb02..41e0eb9 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -52,7 +53,7 @@ public class PartitioningReusageTest extends CompilerTestBase 
{
                                .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                        .where(0,1).equalTo(0,1).with(new 
MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -73,7 +74,7 @@ public class PartitioningReusageTest extends CompilerTestBase 
{
                                .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -96,7 +97,7 @@ public class PartitioningReusageTest extends CompilerTestBase 
{
                                .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(0,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -119,7 +120,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -142,7 +143,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -164,7 +165,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .join(set2, 
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -187,7 +188,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -212,7 +213,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(0,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -238,7 +239,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -263,7 +264,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,1).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -288,7 +289,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,2).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -313,7 +314,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,2).equalTo(2,1).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -338,7 +339,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,2).equalTo(1,2).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -363,7 +364,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                
JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
                                .where(0,2).equalTo(1,2).with(new MockJoin());
 
-               joined.print();
+               joined.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -384,7 +385,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .coGroup(set2)
                                .where(0,1).equalTo(0,1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -405,7 +406,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .coGroup(set2)
                                .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -428,7 +429,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .coGroup(set2)
                                .where(0,1).equalTo(0,1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -451,7 +452,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .coGroup(set2)
                                .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -473,7 +474,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                                
.withForwardedFields("2;1"))
                                .where(0,1).equalTo(2, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -495,7 +496,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                .coGroup(set2)
                                .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -517,7 +518,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                                
.withForwardedFields("2"))
                                .where(0,1).equalTo(2,1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -541,7 +542,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("0;1"))
                                .where(0, 1).equalTo(0, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -566,7 +567,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("1;2"))
                                .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -590,7 +591,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("2;1"))
                                .where(0, 1).equalTo(2, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -614,7 +615,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("1"))
                                .where(0, 2).equalTo(2, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -638,7 +639,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("1"))
                                .where(0, 2).equalTo(2, 1).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -662,7 +663,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("2"))
                                .where(0, 2).equalTo(1, 2).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -686,7 +687,7 @@ public class PartitioningReusageTest extends 
CompilerTestBase {
                                                .withForwardedFields("1"))
                                .where(0, 2).equalTo(1, 2).with(new 
MockCoGroup());
 
-               coGrouped.print();
+               coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, 
Integer, Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 84f6377..68e8a41 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -50,7 +52,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                                                                
.map(new IdentityMapper<Long>())
                                                                                
        .withBroadcastSet(source, "bc");
                        
-                       result.print();
+                       result.output(new DiscardingOutputFormat<Long>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
@@ -84,7 +86,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                                        
.withBroadcastSet(bcInput1, "bc1")
                                                        
.withBroadcastSet(bcInput2, "bc2");
                        
-                       result.print();
+                       result.output(new DiscardingOutputFormat<Long>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
@@ -123,7 +125,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                                        
.withBroadcastSet(bcInput1, "bc1");
                                                        
                        
-                       iteration.closeWith(result).print();
+                       iteration.closeWith(result).output(new 
DiscardingOutputFormat<Long>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
@@ -154,7 +156,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
-                                       .print();
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                                
                                
                                Plan p = env.createProgramPlan();
@@ -176,7 +178,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
-                                       .print();
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                                
                                
                                Plan p = env.createProgramPlan();
@@ -199,7 +201,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
-                                       .print();
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
                                
                                
                                Plan p = env.createProgramPlan();
@@ -222,7 +224,7 @@ public class PipelineBreakerTest extends CompilerTestBase {
                                initialSource
                                        .map(new IdentityMapper<Long>())
                                        
.cross(initialSource).withParameters(conf)
-                                       .print();
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long,Long>>());
                                
                                
                                Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index d6b9444..dc9f2e5 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -64,7 +65,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy(0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -97,7 +98,7 @@ public class PropertyDataSourceTest extends CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy(1, 0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long,String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -129,7 +130,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy("*");
 
-               data.print();
+               data.output(new 
DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -161,7 +162,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy("f1");
 
-               data.print();
+               data.output(new 
DiscardingOutputFormat<Tuple3<Long,SomePojo,String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -193,7 +194,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy("f1.stringField");
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -225,7 +226,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy("f1.intField; f2");
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -258,7 +259,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data.getSplitDataProperties()
                                .splitsPartitionedBy("byDate", 1, 0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -293,7 +294,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(0)
                                .splitsGroupedBy(0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -327,7 +328,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(0)
                                .splitsGroupedBy(1, 0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -362,7 +363,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(1)
                                .splitsGroupedBy(0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -396,7 +397,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(0, 1)
                                .splitsGroupedBy(0);
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -429,7 +430,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy("f2")
                                .splitsGroupedBy("f2");
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -463,7 +464,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy("f1.intField")
                                .splitsGroupedBy("f0; f1.intField");
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -497,7 +498,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy("f1.intField")
                                .splitsGroupedBy("f1");
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -530,7 +531,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy("f1")
                                .splitsGroupedBy("f1.stringField");
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -565,7 +566,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(1)
                                .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.ASCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -599,7 +600,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(1)
                                .splitsOrderedBy(new int[]{1, 0}, new 
Order[]{Order.ASCENDING, Order.DESCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -634,7 +635,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(0)
                                .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.ASCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -668,7 +669,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy(0, 1)
                                .splitsOrderedBy(new int[]{1}, new 
Order[]{Order.DESCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -701,7 +702,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                        .splitsPartitionedBy("f1.intField")
                        .splitsOrderedBy("f0; f1.intField", new 
Order[]{Order.ASCENDING, Order.DESCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -735,7 +736,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy("f1.intField")
                                .splitsOrderedBy("f1", new 
Order[]{Order.DESCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -768,7 +769,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                                .splitsPartitionedBy("f1")
                                .splitsOrderedBy("f1.stringField", new 
Order[]{Order.ASCENDING});
 
-               data.print();
+               data.output(new DiscardingOutputFormat<Tuple3<Long, SomePojo, 
String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -808,7 +809,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data2.getSplitDataProperties()
                                .splitsPartitionedBy("byDate", 0);
 
-               data1.union(data2).print();
+               data1.union(data2).output(new 
DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 
@@ -856,7 +857,7 @@ public class PropertyDataSourceTest extends 
CompilerTestBase {
                data2.getSplitDataProperties()
                                .splitsPartitionedBy("byDate", 0);
 
-               data1.union(data2).print();
+               data1.union(data2).output(new 
DiscardingOutputFormat<Tuple2<Long, String>>());
 
                JavaPlan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index 3af64fc..b0dca66 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -20,36 +20,35 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
 import org.junit.Test;
 
 /**
  * This test case has been created to validate a bug that occurred when
  * the ReduceOperator was used without a grouping key.
  */
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class ReduceAllTest extends CompilerTestBase {
 
        @Test
        public void testReduce() {
                // construct the plan
-               FileDataSource source = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Source");
-               ReduceOperator reduce1 = ReduceOperator.builder(new 
IdentityReduce()).name("Reduce1").input(source).build();
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, "Sink");
-               sink.setInput(reduce1);
-               Plan plan = new Plan(sink, "AllReduce Test");
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(DEFAULT_PARALLELISM);
+               DataSet<Long> set1 = env.generateSequence(0,1);
+
+               set1.reduceGroup(new 
IdentityGroupReducer<Long>()).name("Reduce1")
+                               .output(new 
DiscardingOutputFormat<Long>()).name("Sink");
+
+               JavaPlan plan = env.createProgramPlan();
+
                try {
                        OptimizedPlan oPlan = compileNoStats(plan);
                        JobGraphGenerator jobGen = new JobGraphGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 25643a4..26af380 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial", "unchecked"})
 public class ReplicatingDataSourceTest extends CompilerTestBase {
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 3a24ce1..00ada2a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -57,7 +58,7 @@ public class SemanticPropertiesAPIToPlanTest extends 
CompilerTestBase {
                                .groupBy(1)
                                .reduce(new 
MockReducer()).withForwardedFields("*");
 
-               set.print();
+               set.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, 
Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 
@@ -118,7 +119,7 @@ public class SemanticPropertiesAPIToPlanTest extends 
CompilerTestBase {
                                .reduce(new 
MockReducer()).withForwardedFields("f1->f2");
                DataSet<Tuple3<Integer, Integer, Integer>> out = 
in1.join(in2).where(1).equalTo(2).with(new MockJoin());
 
-               out.print();
+               out.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, 
Integer>>());
                JavaPlan plan = env.createProgramPlan();
                OptimizedPlan oPlan = compileWithStats(plan);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 65e5025..a94f845 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
@@ -30,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -51,12 +52,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
                                .map(new 
IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
                                
                                .groupBy(0, 1)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", 
"2")
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0",
 "1", "2")
                                
                                .groupBy(0)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-                               
-                               .print();
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+                               .output(new DiscardingOutputFormat<Tuple3<Long, 
Long, Long>>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
@@ -96,12 +97,12 @@ public class SortPartialReuseTest extends CompilerTestBase {
                                .map(new 
IdentityMapper<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", "2")
                                
                                .groupBy(0, 1)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1", 
"2")
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0",
 "1", "2")
                                
                                .groupBy(1)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>())
-                               
-                               .print();
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
+
+                               .output(new DiscardingOutputFormat<Tuple3<Long, 
Long, Long>>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 1001626..f041b2a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -50,8 +51,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends 
CompilerTestBase {
                        DataSet<Long> result = iteration.closeWith(
                                        
input2.union(input2).union(iteration.union(iteration)));
                                
-                       result.print();
-                       result.print();
+                       result.output(new DiscardingOutputFormat<Long>());
+                       result.output(new DiscardingOutputFormat<Long>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
@@ -102,8 +103,8 @@ public class UnionBetweenDynamicAndStaticPathTest extends 
CompilerTestBase {
                        DataSet<Long> iterResult = iteration
                                
.closeWith(iteration.union(iteration).union(input2.union(input2)));
                        
-                       iterResult.print();
-                       iterResult.print();
+                       iterResult.output(new DiscardingOutputFormat<Long>());
+                       iterResult.output(new DiscardingOutputFormat<Long>());
                        
                        
                        Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 2e52565..fee6e17 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -26,13 +26,12 @@ import 
org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;
 import org.junit.Assert;
@@ -45,37 +44,26 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityReduce;
 
 
-@SuppressWarnings({"serial", "deprecation"})
+@SuppressWarnings({"serial"})
 public class UnionPropertyPropagationTest extends CompilerTestBase {
 
-       @SuppressWarnings("unchecked")
        @Test
-       public void testUnionPropertyOldApiPropagation() {
+       public void testUnion1() {
                // construct the plan
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(DEFAULT_PARALLELISM);
+               DataSet<Long> sourceA = env.generateSequence(0,1);
+               DataSet<Long> sourceB = env.generateSequence(0,1);
 
-               FileDataSource sourceA = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-               FileDataSource sourceB = new FileDataSource(new 
DummyInputFormat(), IN_FILE);
-               
-               ReduceOperator redA = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                       .input(sourceA)
-                       .build();
-               ReduceOperator redB = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0)
-                       .input(sourceB)
-                       .build();
-               
-               ReduceOperator globalRed = ReduceOperator.builder(new 
IdentityReduce(), IntValue.class, 0).build();
-               globalRed.addInput(redA);
-               globalRed.addInput(redB);
-               
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, globalRed);
-               
-               // return the plan
-               Plan plan = new Plan(sink, "Union Property Propagation");
+               DataSet<Long> redA = sourceA.groupBy("*").reduceGroup(new 
IdentityGroupReducer<Long>());
+               DataSet<Long> redB = sourceB.groupBy("*").reduceGroup(new 
IdentityGroupReducer<Long>());
+
+               redA.union(redB).groupBy("*").reduceGroup(new 
IdentityGroupReducer<Long>())
+                       .output(new DiscardingOutputFormat<Long>());
+
+               JavaPlan plan = env.createProgramPlan();
                
                OptimizedPlan oPlan = compileNoStats(plan);
                
@@ -88,7 +76,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                        
                        @Override
                        public boolean preVisit(PlanNode visitable) {
-                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof ReduceOperator) {
+                               if (visitable instanceof SingleInputPlanNode && 
visitable.getProgramOperator() instanceof GroupReduceOperatorBase) {
                                        for (Channel inConn : 
visitable.getInputs()) {
                                                Assert.assertTrue("Reduce 
should just forward the input if it is already partitioned",
                                                                
inConn.getShipStrategy() == ShipStrategyType.FORWARD); 
@@ -107,7 +95,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
        }
        
        @Test
-       public void testUnionNewApiAssembly() {
+       public void testUnion2() {
                final int NUM_INPUTS = 4;
                
                // construct the plan it will be multiple flat maps, all unioned

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index e81e0ec..65dd2b3 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
@@ -40,8 +41,8 @@ public class UnionReplacementTest extends CompilerTestBase {
        
                        DataSet<String> union = input1.union(input2);
        
-                       union.print();
-                       union.print();
+                       union.output(new DiscardingOutputFormat<String>());
+                       union.output(new DiscardingOutputFormat<String>());
        
                        Plan plan = env.createProgramPlan();
                        OptimizedPlan oPlan = compileNoStats(plan);

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 321ca5a..32bd6e9 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
@@ -40,12 +42,13 @@ public class WorksetIterationCornerCasesTest extends 
CompilerTestBase {
                try {
                        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                        
-                       DataSet<Tuple2<Long, Long>> input = 
env.generateSequence(1, 100).map(new Duplicator<Long>());
+                       DataSet<Tuple2<Long, Long>> input = 
env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class);
                        
                        DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration = input.iterateDelta(input, 100, 1);
                        
-                       DataSet<Tuple2<Long, Long>> iterEnd = 
iteration.getWorkset().map(new TestMapper<Tuple2<Long,Long>>());
-                       iteration.closeWith(iterEnd, iterEnd).print();
+                       DataSet<Tuple2<Long, Long>> iterEnd = 
iteration.getWorkset().map(new IdentityMapper<Tuple2<Long, Long>>());
+                       iteration.closeWith(iterEnd, iterEnd)
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long,Long>>());
                        
                        Plan p = env.createProgramPlan();
                        OptimizedPlan op = compileNoStats(p);
@@ -61,18 +64,5 @@ public class WorksetIterationCornerCasesTest extends 
CompilerTestBase {
                        fail(e.getMessage());
                }
        }
-       
-       private static final class Duplicator<T> implements MapFunction<T, 
Tuple2<T, T>> {
-               @Override
-               public Tuple2<T, T> map(T value) {
-                       return new Tuple2<T, T>(value, value);
-               }
-       }
-       
-       private static final class TestMapper<T> implements MapFunction<T, T> {
-               @Override
-               public T map(T value) {
-                       return value;
-               }
-       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 27f367f..46b9357 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -25,25 +25,21 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityJoiner;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.types.LongValue;
 import org.junit.Test;
 
 
@@ -51,7 +47,6 @@ import org.junit.Test;
 * Tests that validate optimizer choices when using operators that are 
requesting certain specific execution
 * strategies.
 */
-@SuppressWarnings("deprecation")
 public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
        
        private static final long serialVersionUID = 1L;
@@ -66,7 +61,7 @@ public class WorksetIterationsRecordApiCompilerTest extends 
CompilerTestBase {
 
        @Test
        public void testRecordApiWithDeferredSoltionSetUpdateWithMapper() {
-               Plan plan = getRecordTestPlan(false, true);
+               Plan plan = getTestPlan(false, true);
                
                OptimizedPlan oPlan;
                try {
@@ -112,7 +107,7 @@ public class WorksetIterationsRecordApiCompilerTest extends 
CompilerTestBase {
        
        @Test
        public void 
testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
-               Plan plan = getRecordTestPlan(false, false);
+               Plan plan = getTestPlan(false, false);
                
                OptimizedPlan oPlan;
                try {
@@ -156,7 +151,7 @@ public class WorksetIterationsRecordApiCompilerTest extends 
CompilerTestBase {
        
        @Test
        public void testRecordApiWithDirectSoltionSetUpdate() {
-               Plan plan = getRecordTestPlan(true, false);
+               Plan plan = getTestPlan(true, false);
                
                OptimizedPlan oPlan;
                try {
@@ -197,52 +192,45 @@ public class WorksetIterationsRecordApiCompilerTest 
extends CompilerTestBase {
                new JobGraphGenerator().compileJobGraph(oPlan);
        }
        
-       private Plan getRecordTestPlan(boolean joinPreservesSolutionSet, 
boolean mapBeforeSolutionDelta) {
-               FileDataSource solutionSetInput = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Solution Set");
-               FileDataSource worksetInput = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Workset");
-               
-               FileDataSource invariantInput = new FileDataSource(new 
DummyInputFormat(), IN_FILE, "Invariant Input");
-               
-               DeltaIteration iteration = new DeltaIteration(0, 
ITERATION_NAME);
-               iteration.setInitialSolutionSet(solutionSetInput);
-               iteration.setInitialWorkset(worksetInput);
-               iteration.setMaximumNumberOfIterations(100);
-
-               JoinOperator joinWithInvariant = JoinOperator.builder(new 
DummyMatchStub(), LongValue.class, 0, 0)
-                               .input1(iteration.getWorkset())
-                               .input2(invariantInput)
-                               .name(JOIN_WITH_INVARIANT_NAME)
-                               .build();
-
-               JoinOperator joinWithSolutionSet = JoinOperator.builder(
-                               joinPreservesSolutionSet ? new DummyMatchStub() 
: new DummyNonPreservingMatchStub(), LongValue.class, 0, 0)
-                               .input1(iteration.getSolutionSet())
-                               .input2(joinWithInvariant)
-                               .name(JOIN_WITH_SOLUTION_SET)
-                               .build();
-               
-               ReduceOperator nextWorkset = ReduceOperator.builder(new 
IdentityReduce(), LongValue.class, 0)
-                               .input(joinWithSolutionSet)
-                               .name(NEXT_WORKSET_REDUCER_NAME)
-                               .build();
-               
-               if (mapBeforeSolutionDelta) {
-                       MapOperator mapper = MapOperator.builder(new 
IdentityMap())
-                               .input(joinWithSolutionSet)
-                               .name(SOLUTION_DELTA_MAPPER_NAME)
-                               .build();
-                       iteration.setSolutionSetDelta(mapper);
-               } else {
-                       iteration.setSolutionSetDelta(joinWithSolutionSet);
+       private Plan getTestPlan(boolean joinPreservesSolutionSet, boolean 
mapBeforeSolutionDelta) {
+
+               // construct the plan
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(DEFAULT_PARALLELISM);
+               DataSet<Tuple2<Long, Long>> solSetInput = 
env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Solution 
Set");
+               DataSet<Tuple2<Long, Long>> workSetInput = 
env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Workset");
+               DataSet<Tuple2<Long, Long>> invariantInput = 
env.readCsvFile("/tmp/sol.csv").types(Long.class, Long.class).name("Invariant 
Input");
+
+               DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> deltaIt 
= solSetInput.iterateDelta(workSetInput, 100, 0).name(ITERATION_NAME);
+
+               DataSet<Tuple2<Long, Long>> join1 = 
deltaIt.getWorkset().join(invariantInput).where(0).equalTo(0)
+                               .with(new IdentityJoiner<Tuple2<Long, Long>>())
+                               
.withForwardedFieldsFirst("*").name(JOIN_WITH_INVARIANT_NAME);
+
+               DataSet<Tuple2<Long, Long>> join2 = 
deltaIt.getSolutionSet().join(join1).where(0).equalTo(0)
+                               .with(new IdentityJoiner<Tuple2<Long, Long>>())
+                               .name(JOIN_WITH_SOLUTION_SET);
+               if(joinPreservesSolutionSet) {
+                       
((JoinOperator<?,?,?>)join2).withForwardedFieldsFirst("*");
                }
-               
-               iteration.setNextWorkset(nextWorkset);
 
-               FileDataSink sink = new FileDataSink(new DummyOutputFormat(), 
OUT_FILE, iteration, "Sink");
-               
-               Plan plan = new Plan(sink);
-               plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-               return plan;
+               DataSet<Tuple2<Long, Long>> nextWorkset = 
join2.groupBy(0).reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>())
+                               
.withForwardedFields("*").name(NEXT_WORKSET_REDUCER_NAME);
+
+               if(mapBeforeSolutionDelta) {
+
+                       DataSet<Tuple2<Long, Long>> mapper = join2.map(new 
IdentityMapper<Tuple2<Long, Long>>())
+                                       
.withForwardedFields("*").name(SOLUTION_DELTA_MAPPER_NAME);
+
+                       deltaIt.closeWith(mapper, nextWorkset)
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long,Long>>());
+               }
+               else {
+                       deltaIt.closeWith(join2, nextWorkset)
+                                       .output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+               }
+
+               return env.createProgramPlan();
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index d52181d..346e702 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -246,7 +246,7 @@ public class CoGroupCustomPartitioningTest extends 
CompilerTestBase {
                                .distinct(0, 1)
                                .groupBy(1)
                                .sortGroup(0, Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0",
 "1");
                        
                        grouped
                                .coGroup(partitioned).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 5758c86..17a7659 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class CustomPartitioningGlobalOptimizationTest extends 
CompilerTestBase {
                                .withPartitioner(partitioner);
 
                        joined.groupBy(1).withPartitioner(partitioner)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>())
                                .print();
 
                        Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 0408ca9..23f4812 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -84,7 +84,7 @@ public class GroupingKeySelectorTranslationTest extends 
CompilerTestBase {
                        
                        data.groupBy(new 
TestKeySelector<Tuple2<Integer,Integer>>())
                                .withPartitioner(new TestPartitionerInt())
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple2<Integer,Integer>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
                                .print();
                        
                        Plan p = env.createProgramPlan();
@@ -115,7 +115,7 @@ public class GroupingKeySelectorTranslationTest extends 
CompilerTestBase {
                        data.groupBy(new 
TestKeySelector<Tuple3<Integer,Integer,Integer>>())
                                .withPartitioner(new TestPartitionerInt())
                                .sortGroup(new TestKeySelector<Tuple3<Integer, 
Integer, Integer>>(), Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
                                .print();
                        
                        Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 74e5c8c..54033ac 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -76,7 +76,7 @@ public class GroupingPojoTranslationTest extends 
CompilerTestBase {
                                        .rebalance().setParallelism(4);
                        
                        data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
-                               .reduceGroup(new IdentityGroupReducer<Pojo2>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Pojo2>())
                                .print();
                        
                        Plan p = env.createProgramPlan();
@@ -106,7 +106,7 @@ public class GroupingPojoTranslationTest extends 
CompilerTestBase {
                        
                        data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
                                .sortGroup("b", Order.ASCENDING)
-                               .reduceGroup(new IdentityGroupReducer<Pojo3>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Pojo3>())
                                .print();
                        
                        Plan p = env.createProgramPlan();
@@ -137,7 +137,7 @@ public class GroupingPojoTranslationTest extends 
CompilerTestBase {
                        data.groupBy("a").withPartitioner(new 
TestPartitionerInt())
                                .sortGroup("b", Order.ASCENDING)
                                .sortGroup("c", Order.DESCENDING)
-                               .reduceGroup(new IdentityGroupReducer<Pojo4>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Pojo4>())
                                .print();
                        
                        Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 72fb81b..49f44f5 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -108,7 +108,7 @@ public class GroupingTupleTranslationTest extends 
CompilerTestBase {
                                        .rebalance().setParallelism(4);
                        
                        data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple2<Integer,Integer>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
                                .print();
                        
                        Plan p = env.createProgramPlan();
@@ -138,7 +138,7 @@ public class GroupingTupleTranslationTest extends 
CompilerTestBase {
                        
                        data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
                                .sortGroup(1, Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Integer,Integer,Integer>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
                                .print();
                        
                        Plan p = env.createProgramPlan();
@@ -169,7 +169,7 @@ public class GroupingTupleTranslationTest extends 
CompilerTestBase {
                        data.groupBy(0).withPartitioner(new 
TestPartitionerInt())
                                .sortGroup(1, Order.ASCENDING)
                                .sortGroup(2, Order.DESCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple4<Integer,Integer,Integer,Integer>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
                                .print();
                        
                        Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index 8eedee1..ff429b8 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -243,7 +243,7 @@ public class JoinCustomPartitioningTest extends 
CompilerTestBase {
                                .distinct(0, 1)
                                .groupBy(1)
                                .sortGroup(0, Order.ASCENDING)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withForwardedFields("0", "1");
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple3<Long,Long,Long>>()).withForwardedFields("0",
 "1");
                        
                        grouped
                                .join(partitioned, 
JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 95ee4de..9c2d0d2 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class PartitionOperatorTest extends CompilerTestBase {
                                        public int partition(Long key, int 
numPartitions) { return key.intValue(); }
                                }, 1)
                                .groupBy(1)
-                               .reduceGroup(new 
IdentityGroupReducer<Tuple2<Long,Long>>())
+                               .reduceGroup(new 
IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
                                .print();
                        
                        Plan p = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
new file mode 100644
index 0000000..9d8ac2e
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCoGrouper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.util.Collector;
+
+public class IdentityCoGrouper<T> implements CoGroupFunction<T, T, T> {
+       
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void coGroup(Iterable<T> first, Iterable<T> second, Collector<T> 
out) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
new file mode 100644
index 0000000..54b2785
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityCrosser.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+
+public class IdentityCrosser<T> implements CrossFunction<T, T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T cross(T first, T second) {
+               return first;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
index 11fd044..da4ef17 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -23,8 +23,6 @@ import 
org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
-
-@Combinable
 public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
new file mode 100644
index 0000000..ce24bb6
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.util.Collector;
+
+@Combinable
+public class IdentityGroupReducerCombinable<T> extends 
RichGroupReduceFunction<T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void reduce(Iterable<T> values, Collector<T> out) {
+               for (T next : values) {
+                       out.collect(next);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
new file mode 100644
index 0000000..faca2ce
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityJoiner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+
+public class IdentityJoiner<T> implements JoinFunction<T, T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public T join(T first, T second) {
+               return first;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
deleted file mode 100644
index 6a84c44..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCoGroupStub.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyCoGroupStub extends CoGroupFunction implements Serializable {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void coGroup(Iterator<Record> records1, Iterator<Record> 
records2, Collector<Record> out) {
-               while (records1.hasNext()) {
-                       out.collect(records1.next());
-               }
-
-               while (records2.hasNext()) {
-                       out.collect(records2.next());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
deleted file mode 100644
index 8ee2285..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyCrossStub.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.types.Record;
-
-@SuppressWarnings("deprecation")
-public class DummyCrossStub extends CrossFunction {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public Record cross(Record first, Record second) throws Exception {
-               return first;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
deleted file mode 100644
index 0c816e7..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyInputFormat.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-
-public final class DummyInputFormat extends DelimitedInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       private final IntValue integer = new IntValue(1);
-
-       @Override
-       public Record readRecord(Record target, byte[] bytes, int offset, int 
numBytes) {
-               target.setField(0, this.integer);
-               target.setField(1, this.integer);
-               return target;
-       }
-
-       @Override
-       public FileBaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
-               return (cachedStatistics instanceof FileBaseStatistics) ? 
(FileBaseStatistics) cachedStatistics : null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
deleted file mode 100644
index d00be6e..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyMatchStub.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirstExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsFirstExcept({})
-public class DummyMatchStub extends JoinFunction implements Serializable {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void join(Record value1, Record value2, Collector<Record> out) 
throws Exception {
-               out.collect(value1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
deleted file mode 100644
index 444b48e..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyNonPreservingMatchStub.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-public class DummyNonPreservingMatchStub extends JoinFunction implements 
Serializable {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void join(Record value1, Record value2, Collector<Record> out) 
throws Exception {
-               out.collect(value1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
deleted file mode 100644
index 1bbe24c..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/DummyOutputFormat.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
-import org.apache.flink.types.Record;
-
-
-public final class DummyOutputFormat extends DelimitedOutputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       @Override
-       public int serializeRecord(Record rec, byte[] target) throws Exception {
-               return 0;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
deleted file mode 100644
index cccc6cb..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityMap.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.record.functions.MapFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityMap extends MapFunction implements Serializable {
-       private static final long serialVersionUID = 1L;
-       
-       @Override
-       public void map(Record record, Collector<Record> out) throws Exception {
-               out.collect(record);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bd96ba8d/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
deleted file mode 100644
index f45745d..0000000
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/IdentityReduce.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsExcept;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-@SuppressWarnings("deprecation")
-@ConstantFieldsExcept({})
-public final class IdentityReduce extends ReduceFunction implements 
Serializable {
-       private static final long serialVersionUID = 1L;
-       
-       @Override
-       public void reduce(Iterator<Record> records, Collector<Record> out) 
throws Exception {
-               while (records.hasNext()) {
-                       out.collect(records.next());
-               }
-       }
-}

Reply via email to