http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
index 2f749d4..4c8177a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableAggregatorITCase.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -147,8 +147,7 @@ public class 
ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
                }
        }
 
-       public static final class NeighborWithComponentIDJoin extends 
JoinFunction
-               <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class NeighborWithComponentIDJoin extends 
RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -161,8 +160,7 @@ public class 
ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
                }
        }
 
-       public static final class MinimumReduce extends GroupReduceFunction
-               <Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class MinimumReduce extends 
RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
                final Tuple2<Long, Long> resultVertex = new Tuple2<Long, 
Long>();
@@ -189,8 +187,7 @@ public class 
ConnectedComponentsWithParametrizableAggregatorITCase extends JavaP
        }
 
        @SuppressWarnings("serial")
-       public static final class MinimumIdFilter extends FlatMapFunction
-               <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, 
Long>> {
+       public static final class MinimumIdFilter extends 
RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
Tuple2<Long, Long>> {
 
                private static LongSumAggregatorWithParameter aggr;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
index fa1676f..104c3df 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/ConnectedComponentsWithParametrizableConvergenceITCase.java
@@ -25,9 +25,9 @@ import java.util.List;
 
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -139,8 +139,7 @@ public class 
ConnectedComponentsWithParametrizableConvergenceITCase extends Java
                }
        }
 
-       public static final class NeighborWithComponentIDJoin extends 
JoinFunction
-               <Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class NeighborWithComponentIDJoin extends 
RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -153,8 +152,7 @@ public class 
ConnectedComponentsWithParametrizableConvergenceITCase extends Java
                }
        }
 
-       public static final class MinimumReduce extends GroupReduceFunction
-               <Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class MinimumReduce extends 
RichGroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                private static final long serialVersionUID = 1L;
                final Tuple2<Long, Long> resultVertex = new Tuple2<Long, 
Long>();
@@ -181,8 +179,7 @@ public class 
ConnectedComponentsWithParametrizableConvergenceITCase extends Java
        }
 
        @SuppressWarnings("serial")
-       public static final class MinimumIdFilter extends FlatMapFunction
-               <Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, 
Long>> {
+       public static final class MinimumIdFilter extends 
RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
Tuple2<Long, Long>> {
 
                private static LongSumAggregator aggr;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
index 0475a4f..1ec0eb4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductCoGroup.java
@@ -22,8 +22,8 @@ package 
org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.iterative.nephele.ConfigUtils;
 import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
@@ -32,7 +32,7 @@ import 
org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
 import 
org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
 import org.apache.flink.util.Collector;
 
-public class CustomCompensatableDotProductCoGroup extends AbstractFunction 
implements GenericCoGrouper<VertexWithRankAndDangling, VertexWithRank, 
VertexWithRankAndDangling> {
+public class CustomCompensatableDotProductCoGroup extends AbstractRichFunction 
implements CoGroupFunction<VertexWithRankAndDangling, VertexWithRank, 
VertexWithRankAndDangling> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
index 28c77ba..b44d914 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDotProductMatch.java
@@ -22,8 +22,8 @@ package 
org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.iterative.nephele.ConfigUtils;
 import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
@@ -31,8 +31,8 @@ import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
 import org.apache.flink.util.Collector;
 
-public class CustomCompensatableDotProductMatch extends AbstractFunction 
implements
-               GenericJoiner<VertexWithRankAndDangling, 
VertexWithAdjacencyList, VertexWithRank>
+public class CustomCompensatableDotProductMatch extends AbstractRichFunction 
implements
+               FlatJoinFunction<VertexWithRankAndDangling, 
VertexWithAdjacencyList, VertexWithRank>
 {
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
index 74426c0..d83b33b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatingMap.java
@@ -21,7 +21,7 @@ package 
org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import java.util.Set;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.iterative.nephele.ConfigUtils;
@@ -29,7 +29,7 @@ import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.Vert
 import org.apache.flink.test.iterative.nephele.danglingpagerank.PageRankStats;
 import org.apache.flink.util.Collector;
 
-public class CustomCompensatingMap extends AbstractFunction implements 
GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
+public class CustomCompensatingMap extends AbstractRichFunction implements 
GenericCollectorMap<VertexWithRankAndDangling, VertexWithRankAndDangling> {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
index 8af9247..1e08a9f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomRankCombiner.java
@@ -20,15 +20,15 @@ package 
org.apache.flink.test.iterative.nephele.customdanglingpagerank;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import 
org.apache.flink.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
 import org.apache.flink.util.Collector;
 
 
-public class CustomRankCombiner extends AbstractFunction implements 
GenericGroupReduce<VertexWithRank, VertexWithRank>,
-               GenericCombine<VertexWithRank>
+public class CustomRankCombiner extends AbstractRichFunction implements 
GroupReduceFunction<VertexWithRank, VertexWithRank>,
+               FlatCombineFunction<VertexWithRank>
 {
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index b914c1c..3749c1d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -24,8 +24,9 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -301,7 +302,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
        
        }
        
-       public static class Tuple5CoGroup extends 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
+       public static class Tuple5CoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -330,7 +331,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class CustomTypeCoGroup extends 
CoGroupFunction<CustomType, CustomType, CustomType> {
+       public static class CustomTypeCoGroup implements 
CoGroupFunction<CustomType, CustomType, CustomType> {
 
                private static final long serialVersionUID = 1L;
 
@@ -358,7 +359,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
                
        }
        
-       public static class MixedCoGroup extends 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, 
Tuple3<Integer, Long, String>> {
+       public static class MixedCoGroup implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CustomType, 
Tuple3<Integer, Long, String>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -388,7 +389,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
                
        }
        
-       public static class MixedCoGroup2 extends CoGroupFunction<CustomType, 
Tuple5<Integer, Long, Integer, String, Long>, CustomType> {
+       public static class MixedCoGroup2 implements 
CoGroupFunction<CustomType, Tuple5<Integer, Long, Integer, String, Long>, 
CustomType> {
 
                private static final long serialVersionUID = 1L;
 
@@ -417,7 +418,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
                
        }
        
-       public static class Tuple3ReturnLeft extends 
CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, 
Tuple3<Integer, Long, String>> {
+       public static class Tuple3ReturnLeft implements 
CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, 
Tuple3<Integer, Long, String>> {
                
                private static final long serialVersionUID = 1L;
 
@@ -434,7 +435,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class Tuple5ReturnRight extends 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+       public static class Tuple5ReturnRight implements 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
                
                private static final long serialVersionUID = 1L;
 
@@ -456,7 +457,7 @@ public class CoGroupITCase extends JavaProgramTestBase {
 
        }
        
-       public static class Tuple5CoGroupBC extends 
CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
+       public static class Tuple5CoGroupBC extends 
RichCoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, 
Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, 
Integer>> {
 
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index dabe7fc..304dda2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.CrossFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.java.functions.RichCrossFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -400,7 +401,7 @@ public class CrossITCase extends JavaProgramTestBase {
        
        }
        
-       public static class Tuple5Cross extends CrossFunction<Tuple5<Integer, 
Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, 
Tuple2<Integer, String>> {
+       public static class Tuple5Cross implements 
CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple2<Integer, String>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -416,7 +417,7 @@ public class CrossITCase extends JavaProgramTestBase {
 
        }
        
-       public static class CustomTypeCross extends CrossFunction<CustomType, 
CustomType, CustomType> {
+       public static class CustomTypeCross implements 
CrossFunction<CustomType, CustomType, CustomType> {
 
                private static final long serialVersionUID = 1L;
 
@@ -429,7 +430,7 @@ public class CrossITCase extends JavaProgramTestBase {
                
        }
        
-       public static class MixedCross extends CrossFunction<Tuple5<Integer, 
Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
+       public static class MixedCross implements CrossFunction<Tuple5<Integer, 
Long, Integer, String, Long>, CustomType, Tuple3<Integer, Long, String>> {
 
                private static final long serialVersionUID = 1L;
 
@@ -444,7 +445,7 @@ public class CrossITCase extends JavaProgramTestBase {
        }
        
        
-       public static class Tuple3ReturnLeft extends 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple3<Integer, Long, String>> {
+       public static class Tuple3ReturnLeft implements 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple3<Integer, Long, String>> {
                
                private static final long serialVersionUID = 1L;
 
@@ -457,7 +458,7 @@ public class CrossITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class Tuple5ReturnRight extends 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+       public static class Tuple5ReturnRight implements 
CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
                
                private static final long serialVersionUID = 1L;
 
@@ -473,7 +474,7 @@ public class CrossITCase extends JavaProgramTestBase {
 
        }
        
-       public static class Tuple5CrossBC extends CrossFunction<Tuple5<Integer, 
Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, 
Tuple3<Integer, Integer, Integer>> {
+       public static class Tuple5CrossBC extends 
RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, 
Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
 
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 203117c..0c6f3cc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.LinkedList;
 
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -164,7 +164,7 @@ public class DistinctITCase extends JavaProgramTestBase {
                                                                                
return in.myInt;
                                                                        }
                                                                })
-                                               .map(new 
MapFunction<CollectionDataSets.CustomType, Tuple1<Integer>>() {
+                                               .map(new 
RichMapFunction<CustomType, Tuple1<Integer>>() {
                                                        @Override
                                                        public Tuple1<Integer> 
map(CustomType value) throws Exception {
                                                                return new 
Tuple1<Integer>(value.myInt);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
index 96174da..6613bc1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FilterITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -38,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class FilterITCase extends JavaProgramTestBase {
        
-       private static int NUM_PROGRAMS = 8; 
+       private static int NUM_PROGRAMS = 8;
        
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -268,7 +269,7 @@ public class FilterITCase extends JavaProgramTestBase {
                                
                                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                                DataSet<Tuple3<Integer, Long, String>> filterDs 
= ds.
-                                               filter(new 
FilterFunction<Tuple3<Integer,Long,String>>() {
+                                               filter(new 
RichFilterFunction<Tuple3<Integer,Long,String>>() {
                                                        private static final 
long serialVersionUID = 1L;
 
                                                        int literal = -1;
@@ -306,7 +307,7 @@ public class FilterITCase extends JavaProgramTestBase {
                                
                                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                                DataSet<Tuple3<Integer, Long, String>> filterDs 
= ds.
-                                               filter(new 
FilterFunction<Tuple3<Integer,Long,String>>() {
+                                               filter(new 
RichFilterFunction<Tuple3<Integer,Long,String>>() {
                                                        private static final 
long serialVersionUID = 1L;
                                                        private  int 
broadcastSum = 0;
                                                        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
index 1c97347..a6dd377 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FlatMapITCase.java
@@ -23,7 +23,8 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -323,7 +324,7 @@ public class FlatMapITCase extends JavaProgramTestBase {
                                
                                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                                DataSet<Tuple3<Integer, Long, String>> 
bcFlatMapDs = ds.
-                                               flatMap(new 
FlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+                                               flatMap(new 
RichFlatMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() 
{
                                                        private static final 
long serialVersionUID = 1L;
                                                        private final 
Tuple3<Integer, Long, String> outTuple = 
                                                                        new 
Tuple3<Integer, Long, String>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 6556b5e..7376e86 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -24,10 +24,11 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -89,332 +90,336 @@ public class GroupReduceITCase extends 
JavaProgramTestBase {
        private static class GroupReduceProgs {
                
                public static String runProgram(int progId, String resultPath) 
throws Exception {
-                       
-                       switch(progId) {
-                       case 1: {
+
+                       switch (progId) {
+                               case 1: {
                                
                                /*
                                 * check correctness of groupReduce on tuples 
with key field selector
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple2<Integer, Long>> reduceDs = ds.
-                                               groupBy(1).reduceGroup(new 
Tuple3GroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1\n" +
-                                               "5,2\n" +
-                                               "15,3\n" +
-                                               "34,4\n" +
-                                               "65,5\n" +
-                                               "111,6\n";
-                       }
-                       case 2: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple2<Integer, Long>> reduceDs 
= ds.
+                                                       
groupBy(1).reduceGroup(new Tuple3GroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1\n" +
+                                                       "5,2\n" +
+                                                       "15,3\n" +
+                                                       "34,4\n" +
+                                                       "65,5\n" +
+                                                       "111,6\n";
+                               }
+                               case 2: {
                                
                                /*
                                 * check correctness of groupReduce on tuples 
with multiple key field selector
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> ds = CollectionDataSets.get5TupleDataSet(env);
-                               DataSet<Tuple5<Integer, Long, Integer, String, 
Long>> reduceDs = ds.
-                                               groupBy(4,0).reduceGroup(new 
Tuple5GroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,0,P-),1\n" +
-                                               "2,3,0,P-),1\n" +
-                                               "2,2,0,P-),2\n" +
-                                               "3,9,0,P-),2\n" +
-                                               "3,6,0,P-),3\n" +
-                                               "4,17,0,P-),1\n" +
-                                               "4,17,0,P-),2\n" +
-                                               "5,11,0,P-),1\n" +
-                                               "5,29,0,P-),2\n" +
-                                               "5,25,0,P-),3\n";
-                       }
-                       case 3: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> reduceDs = ds.
+                                                       groupBy(4, 
0).reduceGroup(new Tuple5GroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1,0,P-),1\n" +
+                                                       "2,3,0,P-),1\n" +
+                                                       "2,2,0,P-),2\n" +
+                                                       "3,9,0,P-),2\n" +
+                                                       "3,6,0,P-),3\n" +
+                                                       "4,17,0,P-),1\n" +
+                                                       "4,17,0,P-),2\n" +
+                                                       "5,11,0,P-),1\n" +
+                                                       "5,29,0,P-),2\n" +
+                                                       "5,25,0,P-),3\n";
+                               }
+                               case 3: {
                                
                                /*
                                 * check correctness of groupReduce on tuples 
with key field selector and group sorting
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(1);
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> reduceDs 
= ds.
-                                               
groupBy(1).sortGroup(2,Order.ASCENDING).reduceGroup(new 
Tuple3SortedGroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,Hi\n" +
-                                               "5,2,Hello-Hello world\n" +
-                                               "15,3,Hello world, how are 
you?-I am fine.-Luke Skywalker\n" +
-                                               
"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
-                                               
"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
-                                               
"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
-                                                               
-                       }
-                       case 4: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
+                                                       groupBy(1).sortGroup(2, 
Order.ASCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1,Hi\n" +
+                                                       "5,2,Hello-Hello 
world\n" +
+                                                       "15,3,Hello world, how 
are you?-I am fine.-Luke Skywalker\n" +
+                                                       
"34,4,Comment#1-Comment#2-Comment#3-Comment#4\n" +
+                                                       
"65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n" +
+                                                       
"111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
+
+                               }
+                               case 4: {
                                /*
                                 * check correctness of groupReduce on tuples 
with key extractor
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple2<Integer, Long>> reduceDs = ds.
-                                               groupBy(new 
KeySelector<Tuple3<Integer,Long,String>, Long>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Long getKey(Tuple3<Integer, Long, String> in) {
-                                                                               
return in.f1;
-                                                                       }
-                                                               
}).reduceGroup(new Tuple3GroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1\n" +
-                                               "5,2\n" +
-                                               "15,3\n" +
-                                               "34,4\n" +
-                                               "65,5\n" +
-                                               "111,6\n";
-                               
-                       }
-                       case 5: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple2<Integer, Long>> reduceDs 
= ds.
+                                                       groupBy(new 
KeySelector<Tuple3<Integer, Long, String>, Long>() {
+                                                               private static 
final long serialVersionUID = 1L;
+
+                                                               @Override
+                                                               public Long 
getKey(Tuple3<Integer, Long, String> in) {
+                                                                       return 
in.f1;
+                                                               }
+                                                       }).reduceGroup(new 
Tuple3GroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1\n" +
+                                                       "5,2\n" +
+                                                       "15,3\n" +
+                                                       "34,4\n" +
+                                                       "65,5\n" +
+                                                       "111,6\n";
+
+                               }
+                               case 5: {
                                
                                /*
                                 * check correctness of groupReduce on custom 
type with type extractor
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> reduceDs = ds.
-                                               groupBy(new 
KeySelector<CustomType, Integer>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Integer getKey(CustomType in) {
-                                                                               
return in.myInt;
-                                                                       }
-                                                               
}).reduceGroup(new CustomTypeGroupReduce());
-                               
-                               reduceDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0,Hello!\n" +
-                                               "2,3,Hello!\n" +
-                                               "3,12,Hello!\n" +
-                                               "4,30,Hello!\n" +
-                                               "5,60,Hello!\n" +
-                                               "6,105,Hello!\n";
-                       }
-                       case 6: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+                                       DataSet<CustomType> reduceDs = ds.
+                                                       groupBy(new 
KeySelector<CustomType, Integer>() {
+                                                               private static 
final long serialVersionUID = 1L;
+
+                                                               @Override
+                                                               public Integer 
getKey(CustomType in) {
+                                                                       return 
in.myInt;
+                                                               }
+                                                       }).reduceGroup(new 
CustomTypeGroupReduce());
+
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,0,Hello!\n" +
+                                                       "2,3,Hello!\n" +
+                                                       "3,12,Hello!\n" +
+                                                       "4,30,Hello!\n" +
+                                                       "5,60,Hello!\n" +
+                                                       "6,105,Hello!\n";
+                               }
+                               case 6: {
                                
                                /*
                                 * check correctness of all-groupreduce for 
tuples
                                 */
 
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> reduceDs 
= ds.reduceGroup(new AllAddingTuple3GroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "231,91,Hello World\n";
-                       }
-                       case 7: {
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.reduceGroup(new AllAddingTuple3GroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "231,91,Hello World\n";
+                               }
+                               case 7: {
                                /*
                                 * check correctness of all-groupreduce for 
custom types
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> reduceDs = 
ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
-                               
-                               reduceDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "91,210,Hello!";
-                       }
-                       case 8: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+                                       DataSet<CustomType> reduceDs = 
ds.reduceGroup(new AllAddingCustomTypeGroupReduce());
+
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "91,210,Hello!";
+                               }
+                               case 8: {
                                
                                /*
                                 * check correctness of groupReduce with 
broadcast set
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> reduceDs 
= ds.
-                                               groupBy(1).reduceGroup(new 
BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,55\n" +
-                                               "5,2,55\n" +
-                                               "15,3,55\n" +
-                                               "34,4,55\n" +
-                                               "65,5,55\n" +
-                                               "111,6,55\n";
-                       }
-                       case 9: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Integer> intDs = 
CollectionDataSets.getIntegerDataSet(env);
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
+                                                       
groupBy(1).reduceGroup(new BCTuple3GroupReduce()).withBroadcastSet(intDs, 
"ints");
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1,55\n" +
+                                                       "5,2,55\n" +
+                                                       "15,3,55\n" +
+                                                       "34,4,55\n" +
+                                                       "65,5,55\n" +
+                                                       "111,6,55\n";
+                               }
+                               case 9: {
                                
                                /*
                                 * check correctness of groupReduce if UDF 
returns input objects multiple times and changes it in between
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> reduceDs 
= ds.
-                                               groupBy(1).reduceGroup(new 
InputReturningTuple3GroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "11,1,Hi!\n" +
-                                               "21,1,Hi again!\n" +
-                                               "12,2,Hi!\n" +
-                                               "22,2,Hi again!\n" +
-                                               "13,2,Hi!\n" +
-                                               "23,2,Hi again!\n";
-                       }
-                       case 10: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
+                                                       
groupBy(1).reduceGroup(new InputReturningTuple3GroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "11,1,Hi!\n" +
+                                                       "21,1,Hi again!\n" +
+                                                       "12,2,Hi!\n" +
+                                                       "22,2,Hi again!\n" +
+                                                       "13,2,Hi!\n" +
+                                                       "23,2,Hi again!\n";
+                               }
+                               case 10: {
                                
                                /*
                                 * check correctness of groupReduce on custom 
type with key extractor and combine
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
-                               DataSet<CustomType> reduceDs = ds.
-                                               groupBy(new 
KeySelector<CustomType, Integer>() {
-                                                                       private 
static final long serialVersionUID = 1L;
-                                                                       
@Override
-                                                                       public 
Integer getKey(CustomType in) {
-                                                                               
return in.myInt;
-                                                                       }
-                                                               
}).reduceGroup(new CustomTypeGroupReduceWithCombine());
-                               
-                               reduceDs.writeAsText(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,0,test1\n" +
-                                               "2,3,test2\n" +
-                                               "3,12,test3\n" +
-                                               "4,30,test4\n" +
-                                               "5,60,test5\n" +
-                                               "6,105,test6\n";
-                       }
-                       case 11: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<CustomType> ds = 
CollectionDataSets.getCustomTypeDataSet(env);
+                                       DataSet<CustomType> reduceDs = ds.
+                                                       groupBy(new 
KeySelector<CustomType, Integer>() {
+                                                               private static 
final long serialVersionUID = 1L;
+
+                                                               @Override
+                                                               public Integer 
getKey(CustomType in) {
+                                                                       return 
in.myInt;
+                                                               }
+                                                       }).reduceGroup(new 
CustomTypeGroupReduceWithCombine());
+
+                                       reduceDs.writeAsText(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,0,test1\n" +
+                                                       "2,3,test2\n" +
+                                                       "3,12,test3\n" +
+                                                       "4,30,test4\n" +
+                                                       "5,60,test5\n" +
+                                                       "6,105,test6\n";
+                               }
+                               case 11: {
                                
                                /*
                                 * check correctness of groupReduce on tuples 
with combine
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(2); // important 
because it determines how often the combiner is called
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple2<Integer, String>> reduceDs = ds.
-                                               groupBy(1).reduceGroup(new 
Tuple3GroupReduceWithCombine());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,test1\n" +
-                                               "5,test2\n" +
-                                               "15,test3\n" +
-                                               "34,test4\n" +
-                                               "65,test5\n" +
-                                               "111,test6\n";
-                       }
-                       // all-groupreduce with combine
-                       case 12: {
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(2); // 
important because it determines how often the combiner is called
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple2<Integer, String>> 
reduceDs = ds.
+                                                       
groupBy(1).reduceGroup(new Tuple3GroupReduceWithCombine());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,test1\n" +
+                                                       "5,test2\n" +
+                                                       "15,test3\n" +
+                                                       "34,test4\n" +
+                                                       "65,test5\n" +
+                                                       "111,test6\n";
+                               }
+                               // all-groupreduce with combine
+                               case 12: {
                                
                                /*
                                 * check correctness of all-groupreduce for 
tuples with combine
                                 */
 
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env)
-                                                       .map(new 
IdentityMapper<Tuple3<Integer,Long,String>>()).setParallelism(4);
-                               
-                               Configuration cfg = new Configuration();
-                               cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
-                               DataSet<Tuple2<Integer, String>> reduceDs = 
ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
-                                               .withParameters(cfg);
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return 
"322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
-                       }
-                       // descending sort not working
-                       case 13: {
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env)
+                                                       .map(new 
IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4);
+
+                                       Configuration cfg = new Configuration();
+                                       
cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
+                                       DataSet<Tuple2<Integer, String>> 
reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine())
+                                                       .withParameters(cfg);
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return 
"322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
+                               }
+                               // descending sort not working
+                               case 13: {
                                
                                /*
                                 * check correctness of groupReduce on tuples 
with key field selector and group sorting
                                 */
-                               
-                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                               env.setDegreeOfParallelism(1);
-                               
-                               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-                               DataSet<Tuple3<Integer, Long, String>> reduceDs 
= ds.
-                                               
groupBy(1).sortGroup(2,Order.DESCENDING).reduceGroup(new 
Tuple3SortedGroupReduce());
-                               
-                               reduceDs.writeAsCsv(resultPath);
-                               env.execute();
-                               
-                               // return expected result
-                               return "1,1,Hi\n" +
-                                               "5,2,Hello world-Hello\n" +
-                                               "15,3,Luke Skywalker-I am 
fine.-Hello world, how are you?\n" +
-                                               
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
-                                               
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
-                                               
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
-                               
-                       }
-                       default: 
-                       throw new IllegalArgumentException("Invalid program 
id");
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                                       env.setDegreeOfParallelism(1);
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = CollectionDataSets.get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
reduceDs = ds.
+                                                       groupBy(1).sortGroup(2, 
Order.DESCENDING).reduceGroup(new Tuple3SortedGroupReduce());
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1,Hi\n" +
+                                                       "5,2,Hello 
world-Hello\n" +
+                                                       "15,3,Luke Skywalker-I 
am fine.-Hello world, how are you?\n" +
+                                                       
"34,4,Comment#4-Comment#3-Comment#2-Comment#1\n" +
+                                                       
"65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n" +
+                                                       
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
+
+                               }
+                               default: {
+                                       throw new 
IllegalArgumentException("Invalid program id");
+                               }
                        }
                }
        
        }
        
-       public static class Tuple3GroupReduce extends 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
+       public static class Tuple3GroupReduce implements 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
                private static final long serialVersionUID = 1L;
 
 
@@ -436,7 +441,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class Tuple3SortedGroupReduce extends 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
+       public static class Tuple3SortedGroupReduce implements 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
                private static final long serialVersionUID = 1L;
 
 
@@ -462,7 +467,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class Tuple5GroupReduce extends 
GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, 
Tuple5<Integer, Long, Integer, String, Long>> {
+       public static class Tuple5GroupReduce implements 
GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, 
Tuple5<Integer, Long, Integer, String, Long>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -486,7 +491,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class CustomTypeGroupReduce extends 
GroupReduceFunction<CustomType, CustomType> {
+       public static class CustomTypeGroupReduce implements 
GroupReduceFunction<CustomType, CustomType> {
                private static final long serialVersionUID = 1L;
                
 
@@ -511,8 +516,9 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                        
                }
        }
-       
-       public static class InputReturningTuple3GroupReduce extends 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
+
+
+       public static class InputReturningTuple3GroupReduce implements 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -534,7 +540,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class AllAddingTuple3GroupReduce extends 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
+       public static class AllAddingTuple3GroupReduce implements 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
                private static final long serialVersionUID = 1L;
                
                @Override
@@ -554,7 +560,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class AllAddingCustomTypeGroupReduce extends 
GroupReduceFunction<CustomType, CustomType> {
+       public static class AllAddingCustomTypeGroupReduce implements 
GroupReduceFunction<CustomType, CustomType> {
                private static final long serialVersionUID = 1L;
                
                @Override
@@ -579,7 +585,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class BCTuple3GroupReduce extends 
GroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, 
String>> {
+       public static class BCTuple3GroupReduce extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>,Tuple3<Integer, Long, 
String>> {
                private static final long serialVersionUID = 1L;
                private String f2Replace = "";
                
@@ -613,8 +619,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
-       public static class Tuple3GroupReduceWithCombine extends 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
+       @RichGroupReduceFunction.Combinable
+       public static class Tuple3GroupReduceWithCombine extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> 
{
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -650,8 +656,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
-       public static class Tuple3AllGroupReduceWithCombine extends 
GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
+       @RichGroupReduceFunction.Combinable
+       public static class Tuple3AllGroupReduceWithCombine extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> 
{
                private static final long serialVersionUID = 1L;
                
                @Override
@@ -686,8 +692,8 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       @org.apache.flink.api.java.functions.GroupReduceFunction.Combinable
-       public static class CustomTypeGroupReduceWithCombine extends 
GroupReduceFunction<CustomType, CustomType> {
+       @RichGroupReduceFunction.Combinable
+       public static class CustomTypeGroupReduceWithCombine extends 
RichGroupReduceFunction<CustomType, CustomType> {
                private static final long serialVersionUID = 1L;
                
                @Override
@@ -723,7 +729,7 @@ public class GroupReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static final class IdentityMapper<T> extends MapFunction<T, T> {
+       public static final class IdentityMapper<T> extends RichMapFunction<T, 
T> {
 
                @Override
                public T map(T value) { return value; }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index ffad949..a293cbf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -33,6 +35,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -101,7 +104,7 @@ public class JoinITCase extends JavaProgramTestBase {
                                                ds1.join(ds2)
                                                .where(1)
                                                .equalTo(1)
-                                               .with(new T3T5Join());
+                                               .with(new T3T5FlatJoin());
                                
                                joinDs.writeAsCsv(resultPath);
                                env.execute();
@@ -126,7 +129,7 @@ public class JoinITCase extends JavaProgramTestBase {
                                                ds1.join(ds2)
                                                   .where(0,1)
                                                   .equalTo(0,4)
-                                                  .with(new T3T5Join());
+                                                  .with(new T3T5FlatJoin());
                                
                                joinDs.writeAsCsv(resultPath);
                                env.execute();
@@ -177,7 +180,7 @@ public class JoinITCase extends JavaProgramTestBase {
                                DataSet<Tuple2<String, String>> joinDs = 
ds1.joinWithHuge(ds2)
                                                                                
                                        .where(1)
                                                                                
                                        .equalTo(1)
-                                                                               
                                        .with(new T3T5Join());
+                                                                               
                                        .with(new T3T5FlatJoin());
                                
                                joinDs.writeAsCsv(resultPath);
                                env.execute();
@@ -202,7 +205,7 @@ public class JoinITCase extends JavaProgramTestBase {
                                                ds1.joinWithTiny(ds2)
                                                   .where(1)
                                                   .equalTo(1)
-                                                  .with(new T3T5Join());
+                                                  .with(new T3T5FlatJoin());
                                
                                joinDs.writeAsCsv(resultPath);
                                env.execute();
@@ -292,35 +295,35 @@ public class JoinITCase extends JavaProgramTestBase {
                        }
                        case 9: {
                        
-                       /*
-                        * Join on a tuple input with key field selector and a 
custom type input with key extractor
-                        */
-                       
-                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                               /*
+                                * Join on a tuple input with key field 
selector and a custom type input with key extractor
+                                */
 
-                       DataSet<CustomType> ds1 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
-                       DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
-                       DataSet<Tuple2<String, String>> joinDs = 
-                                       ds1.join(ds2)
-                                          .where(new KeySelector<CustomType, 
Integer>() {
-                                                                  @Override
-                                                                  public 
Integer getKey(CustomType value) {
-                                                                          
return value.myInt;
-                                                                  }
-                                                          }
-                                                          )
-                                          .equalTo(0)
-                                          .with(new CustT3Join());
-                       
-                       joinDs.writeAsCsv(resultPath);
-                       env.execute();
-                       
-                       // return expected result
-                       return "Hi,Hi\n" +
-                                       "Hello,Hello\n" +
-                                       "Hello world,Hello\n";
-                       
-                       }
+                               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               DataSet<CustomType> ds1 = 
CollectionDataSets.getSmallCustomTypeDataSet(env);
+                               DataSet<Tuple3<Integer, Long, String>> ds2 = 
CollectionDataSets.get3TupleDataSet(env);
+                               DataSet<Tuple2<String, String>> joinDs =
+                                               ds1.join(ds2)
+                                                  .where(new 
KeySelector<CustomType, Integer>() {
+                                                                         
@Override
+                                                                         
public Integer getKey(CustomType value) {
+                                                                               
  return value.myInt;
+                                                                         }
+                                                                 }
+                                                  )
+                                                  .equalTo(0)
+                                                  .with(new CustT3Join());
+
+                               joinDs.writeAsCsv(resultPath);
+                               env.execute();
+
+                               // return expected result
+                               return "Hi,Hi\n" +
+                                               "Hello,Hello\n" +
+                                               "Hello world,Hello\n";
+
+                               }
                        case 10: {
                                
                                /*
@@ -458,38 +461,39 @@ public class JoinITCase extends JavaProgramTestBase {
        
        }
        
-       public static class T3T5Join extends JoinFunction<Tuple3<Integer, Long, 
String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
+       public static class T3T5FlatJoin implements 
FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple2<String, String>> {
 
                @Override
-               public Tuple2<String, String> join(Tuple3<Integer, Long, 
String> first,
-                               Tuple5<Integer, Long, Integer, String, Long> 
second)  {
-                       
-                       return new Tuple2<String,String>(first.f2, second.f3);
+               public void join(Tuple3<Integer, Long, String> first,
+                               Tuple5<Integer, Long, Integer, String, Long> 
second,
+                               Collector<Tuple2<String,String>> out)  {
+
+                       out.collect (new Tuple2<String,String> (first.f2, 
second.f3));
                }
-               
+
        }
        
-       public static class LeftReturningJoin extends 
JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple3<Integer, Long, String>> {
+       public static class LeftReturningJoin implements 
JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple3<Integer, Long, String>> {
 
                @Override
                public Tuple3<Integer, Long, String> join(Tuple3<Integer, Long, 
String> first,
-                               Tuple5<Integer, Long, Integer, String, Long> 
second) {
+                                                                               
                  Tuple5<Integer, Long, Integer, String, Long> second) {
                        
                        return first;
                }
        }
        
-       public static class RightReturningJoin extends 
JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
+       public static class RightReturningJoin implements 
JoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, 
String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
 
                @Override
                public Tuple5<Integer, Long, Integer, String, Long> 
join(Tuple3<Integer, Long, String> first,
-                               Tuple5<Integer, Long, Integer, String, Long> 
second) {
+                                                                               
                                                 Tuple5<Integer, Long, Integer, 
String, Long> second) {
                        
                        return second;
                }
        }
                
-       public static class T3T5BCJoin extends JoinFunction<Tuple3<Integer, 
Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<String, 
String, Integer>> {
+       public static class T3T5BCJoin extends 
RichFlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, 
Integer, String, Long>, Tuple3<String, String, Integer>> {
 
                private int broadcast;
                
@@ -505,6 +509,7 @@ public class JoinITCase extends JavaProgramTestBase {
                        
                }
 
+               /*
                @Override
                public Tuple3<String, String, Integer> join(
                                Tuple3<Integer, Long, String> first,
@@ -512,19 +517,25 @@ public class JoinITCase extends JavaProgramTestBase {
 
                        return new Tuple3<String, String, Integer>(first.f2, 
second.f3, broadcast);
                }
+               */
+
+               @Override
+               public void join(Tuple3<Integer, Long, String> first, 
Tuple5<Integer, Long, Integer, String, Long> second, Collector<Tuple3<String, 
String, Integer>> out) throws Exception {
+                       out.collect(new Tuple3<String, String, Integer> 
(first.f2, second.f3, broadcast));
+               }
        }
        
-       public static class T3CustJoin extends JoinFunction<Tuple3<Integer, 
Long, String>, CustomType, Tuple2<String, String>> {
+       public static class T3CustJoin implements JoinFunction<Tuple3<Integer, 
Long, String>, CustomType, Tuple2<String, String>> {
 
                @Override
                public Tuple2<String, String> join(Tuple3<Integer, Long, 
String> first,
-                               CustomType second) {
+                                                                               
   CustomType second) {
 
                        return new Tuple2<String, String>(first.f2, 
second.myString);
                }
        }
        
-       public static class CustT3Join extends JoinFunction<CustomType, 
Tuple3<Integer, Long, String>, Tuple2<String, String>> {
+       public static class CustT3Join implements JoinFunction<CustomType, 
Tuple3<Integer, Long, String>, Tuple2<String, String>> {
 
                @Override
                public Tuple2<String, String> join(CustomType first, 
Tuple3<Integer, Long, String> second) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
index 0921e82..4f1fb1a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/MapITCase.java
@@ -25,7 +25,8 @@ import java.util.LinkedList;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -396,7 +397,7 @@ public class MapITCase extends JavaProgramTestBase {
                                
                                DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
                                DataSet<Tuple3<Integer, Long, String>> bcMapDs 
= ds.
-                                               map(new 
MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+                                               map(new 
RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
                                                        private static final 
long serialVersionUID = 1L;
                                                        private final 
Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
                                                        private Integer 
f2Replace = 0;
@@ -457,7 +458,7 @@ public class MapITCase extends JavaProgramTestBase {
                                final int testValue = 666;
                                conf.setInteger(testKey, testValue);
                                DataSet<Tuple3<Integer, Long, String>> bcMapDs 
= ds.
-                                               map(new 
MapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
+                                               map(new 
RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
                                                        private static final 
long serialVersionUID = 1L;
                                                        
                                                        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index 6cc1061..a296a09 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -23,8 +23,9 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
@@ -270,7 +271,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                                                "65,5,Hi again!\n" +
                                                "111,6,Hi again!\n";
                        }
-                       default: 
+                       default:
                                throw new IllegalArgumentException("Invalid 
program id");
                        }
                        
@@ -278,7 +279,7 @@ public class ReduceITCase extends JavaProgramTestBase {
        
        }
        
-       public static class Tuple3Reduce extends ReduceFunction<Tuple3<Integer, 
Long, String>> {
+       public static class Tuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
                private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
                private final String f2Replace;
@@ -306,7 +307,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class Tuple5Reduce extends ReduceFunction<Tuple5<Integer, 
Long, Integer, String, Long>> {
+       public static class Tuple5Reduce implements 
ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
                private static final long serialVersionUID = 1L;
                private final Tuple5<Integer, Long, Integer, String, Long> out 
= new Tuple5<Integer, Long, Integer, String, Long>();
                
@@ -321,7 +322,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class CustomTypeReduce extends ReduceFunction<CustomType> 
{
+       public static class CustomTypeReduce implements 
ReduceFunction<CustomType> {
                private static final long serialVersionUID = 1L;
                private final CustomType out = new CustomType();
                
@@ -336,7 +337,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class InputReturningTuple3Reduce extends 
ReduceFunction<Tuple3<Integer, Long, String>> {
+       public static class InputReturningTuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -350,7 +351,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class AllAddingTuple3Reduce extends 
ReduceFunction<Tuple3<Integer, Long, String>> {
+       public static class AllAddingTuple3Reduce implements 
ReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
                private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
                
@@ -364,7 +365,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class AllAddingCustomTypeReduce extends 
ReduceFunction<CustomType> {
+       public static class AllAddingCustomTypeReduce implements 
ReduceFunction<CustomType> {
                private static final long serialVersionUID = 1L;
                private final CustomType out = new CustomType();
                
@@ -379,7 +380,7 @@ public class ReduceITCase extends JavaProgramTestBase {
                }
        }
        
-       public static class BCTuple3Reduce extends 
ReduceFunction<Tuple3<Integer, Long, String>> {
+       public static class BCTuple3Reduce extends 
RichReduceFunction<Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
                private final Tuple3<Integer, Long, String> out = new 
Tuple3<Integer, Long, String>();
                private String f2Replace = "";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
index 09191cc..a636ba4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/UnionITCase.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
@@ -145,7 +145,7 @@ public class UnionITCase extends JavaProgramTestBase {
                                
                                // Don't know how to make an empty result in an 
other way than filtering it 
                                DataSet<Tuple3<Integer, Long, String>> empty = 
CollectionDataSets.get3TupleDataSet(env).
-                                               filter(new 
FilterFunction<Tuple3<Integer,Long,String>>() {
+                                               filter(new 
RichFilterFunction<Tuple3<Integer,Long,String>>() {
                                                        private static final 
long serialVersionUID = 1L;
 
                                                        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index ed573be..aaad08c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.Collector;
+import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -49,6 +50,7 @@ import java.util.LinkedList;
 /**
  */
 @RunWith(Parameterized.class)
+//@Ignore("Test needs to be adapted to new cross signature")
 public class CrossITCase extends RecordAPITestBase {
 
        private static final Log LOG = LogFactory.getLog(CrossITCase.class);
@@ -61,13 +63,30 @@ public class CrossITCase extends RecordAPITestBase {
                super(testConfig);
        }
 
-       private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 4\n3 
3\n4 4\n";
+       //private static final String LEFT_IN = "1 1\n2 2\n1 1\n2 2\n3 3\n4 
4\n3 3\n4 4\n";
 
-       private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 6\n4 
4\n4 8\n";
+       //private static final String RIGHT_IN = "1 1\n1 2\n2 2\n2 4\n3 3\n3 
6\n4 4\n4 8\n";
 
-       private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 
4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
-               + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
-               + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 
-1\n7 -1\n8 -1\n8 -1\n";
+       //private static final String RESULT = "4 1\n4 1\n4 2\n4 2\n5 2\n5 2\n5 
4\n5 4\n6 3\n6 3\n7 4\n7 4\n"
+       //      + "5 0\n5 0\n5 1\n5 1\n6 1\n6 1\n6 3\n6 3\n7 2\n7 2\n8 3\n8 3\n"
+       //      + "6 -1\n6 -1\n6 0\n6 0\n7 0\n7 0\n8 1\n8 1\n" + "7 -2\n7 -2\n7 
-1\n7 -1\n8 -1\n8 -1\n";
+
+       //private static final String RESULT = "10 1\n10 1\n10 5\n10 5\n4 1\n4 
1\n4 2\n4 2\n5 0\n5 0\n5 1\n," +
+       //              "5 1\n5 2\n5 2\n5 4\n5 4\n6 -1\n6 -1\n6 0\n6 0\n6 1\n6 
1\n6 3\n6 3\n6 3\n6 3\n6 6\n6 6\n7 -1\n" +
+       //              "7 -1\n7 -2\n7 -2\n7 0\n7 0\n7 2\n7 2\n7 2\n7 2\n7 4\n7 
4\n7 5\n7 5\n7 8\n7 8\n8 -1\n8 -1\n8 1\n" +
+       //              "8 1\n8 1\n8 1\n8 3\n8 3\n8 4\n8 4\n8 7\n8 7\n9 0\n9 
0\n9 2\n9 2\n9 3\n9 3\n9 6\n9 6\n";
+
+       //private static final String RESULT = "2 2\n4 4\n1 1\n3 3\n2 2\n4 4\n1 
1\n3 3\n5 0\n5 1\n6 1\n 6 3\n" +
+       //              "7 2\n7 5\n8 3\n8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 
3\n10 1\n10 5\n4 1\n4 2\n5 2\n5 4\n6 3\n" +
+       //              "6 6\n7 4\n7 8\n6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 
6\n5 0\n5 1\n6 1\n6 3\n7 2\n7 5\n 8 3\n" +
+       //              "8 7\n7 -2\n7 -1\n8 -1\n8 1\n9 0\n9 3\n10 1\n10 5\n4 
1\n4 2\n5 2\n5 4\n6 3\n6 6\n7 4\n7 8\n" +
+       //              "6 -1\n6 0\n7 0\n7 2\n8 1\n8 4\n9 2\n9 6";
+
+
+       private static final String LEFT_IN = "1 1\n2 2\n3 3\n";
+       private static final String RIGHT_IN = "3 6\n4 4\n4 8\n";
+
+       private static final String RESULT = "6 6\n7 5\n7 8\n7 4\n8 3\n8 7\n8 
4\n9 2\n9 6\n";
 
        @Override
        protected void preSubmit() throws Exception {
@@ -84,7 +103,7 @@ public class CrossITCase extends RecordAPITestBase {
                private IntValue integer = new IntValue();
                
                @Override
-               public void cross(Record record1, Record record2, 
Collector<Record> out) {
+               public Record cross(Record record1, Record record2) throws 
Exception {
                        string = record1.getField(1, string);
                        int val1 = Integer.parseInt(string.toString());
                        string = record2.getField(1, string);
@@ -95,16 +114,14 @@ public class CrossITCase extends RecordAPITestBase {
                        int key2 = Integer.parseInt(string.toString());
                        
                        LOG.debug("Processing { [" + key1 + "," + val1 + "] , 
[" + key2 + "," + val2 + "] }");
-                       
-                       if (val1 + val2 <= 6) {
-                               string.setValue((key1 + key2 + 2) + "");
-                               integer.setValue(val2 - val1 + 1);
-                               
-                               record1.setField(0, string);
-                               record1.setField(1, integer);
-                               
-                               out.collect(record1);
-                       }
+
+                       string.setValue((key1 + key2 + 2) + "");
+                       integer.setValue(val2 - val1 + 1);
+
+                       record1.setField(0, string);
+                       record1.setField(1, integer);
+
+                       return record1;
                }
 
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
index a545e05..0f58d18 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/kmeans/udfs/ComputeDistance.java
@@ -47,19 +47,19 @@ public class ComputeDistance extends CrossFunction 
implements Serializable {
         * 3: distance
         */
        @Override
-       public void cross(Record dataPointRecord, Record clusterCenterRecord, 
Collector<Record> out) {
-               
+       public Record cross(Record dataPointRecord, Record clusterCenterRecord) 
throws Exception {
+
                CoordVector dataPoint = dataPointRecord.getField(1, 
CoordVector.class);
-               
+
                IntValue clusterCenterId = clusterCenterRecord.getField(0, 
IntValue.class);
                CoordVector clusterPoint = clusterCenterRecord.getField(1, 
CoordVector.class);
-       
+
                
this.distance.setValue(dataPoint.computeEuclidianDistance(clusterPoint));
-               
-               // add cluster center id and distance to the data point record 
+
+               // add cluster center id and distance to the data point record
                dataPointRecord.setField(2, clusterCenterId);
                dataPointRecord.setField(3, this.distance);
-               
-               out.collect(dataPointRecord);
+
+               return dataPointRecord;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
index 15640c0..441dc39 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recordJobs/relational/TPCHQuery3.java
@@ -89,7 +89,7 @@ public class TPCHQuery3 implements Program, 
ProgramDescription {
                /**
                 * Reads the filter literals from the configuration.
                 * 
-                * @see 
org.apache.flink.api.common.functions.Function#open(org.apache.flink.configuration.Configuration)
+                * @see 
org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)
                 */
                @Override
                public void open(Configuration parameters) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 97367f7..7149cd3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -170,7 +170,7 @@ public class KMeansForTest implements Program {
        // 
*************************************************************************
 
        /** Converts a Tuple2<Double,Double> into a Point. */
-       public static final class TuplePointConverter extends 
MapFunction<Tuple2<Double, Double>, Point> {
+       public static final class TuplePointConverter extends 
RichMapFunction<Tuple2<Double, Double>, Point> {
 
                @Override
                public Point map(Tuple2<Double, Double> t) throws Exception {
@@ -179,7 +179,7 @@ public class KMeansForTest implements Program {
        }
 
        /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
-       public static final class TupleCentroidConverter extends 
MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+       public static final class TupleCentroidConverter extends 
RichMapFunction<Tuple3<Integer, Double, Double>, Centroid> {
 
                @Override
                public Centroid map(Tuple3<Integer, Double, Double> t) throws 
Exception {
@@ -188,7 +188,7 @@ public class KMeansForTest implements Program {
        }
 
        /** Determines the closest cluster center for a data point. */
-       public static final class SelectNearestCenter extends 
MapFunction<Point, Tuple2<Integer, Point>> {
+       public static final class SelectNearestCenter extends 
RichMapFunction<Point, Tuple2<Integer, Point>> {
                private Collection<Centroid> centroids;
 
                /** Reads the centroid values from a broadcast variable into a 
collection. */
@@ -236,7 +236,7 @@ public class KMeansForTest implements Program {
        }
 
        /** Appends a count variable to the tuple. */
-       public static final class CountAppender extends 
MapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
+       public static final class CountAppender extends 
RichMapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
 
                @Override
                public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) {
@@ -245,7 +245,7 @@ public class KMeansForTest implements Program {
        }
 
        /** Sums and counts point coordinates. */
-       public static final class CentroidAccumulator extends 
ReduceFunction<DummyTuple3IntPointLong> {
+       public static final class CentroidAccumulator extends 
RichReduceFunction<DummyTuple3IntPointLong> {
 
                @Override
                public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong 
val1, DummyTuple3IntPointLong val2) {
@@ -254,7 +254,7 @@ public class KMeansForTest implements Program {
        }
 
        /** Computes new centroid from coordinate sum and count of points. */
-       public static final class CentroidAverager extends 
MapFunction<DummyTuple3IntPointLong, Centroid> {
+       public static final class CentroidAverager extends 
RichMapFunction<DummyTuple3IntPointLong, Centroid> {
 
                @Override
                public Centroid map(DummyTuple3IntPointLong value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e0f153..8058539 100644
--- a/pom.xml
+++ b/pom.xml
@@ -274,6 +274,9 @@ under the License.
                                        <activeByDefault>false</activeByDefault>
                                        <jdk>1.8</jdk>
                                </activation>
+                <modules>
+                    <module>flink-java8-tests</module>
+                </modules>
                                <build>
                                        <plugins>
                                                <plugin>

Reply via email to