http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
index 265ce75..c0ea26a 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesOpt.java
@@ -22,12 +22,12 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
@@ -134,7 +134,7 @@ public class EnumTrianglesOpt {
        // 
*************************************************************************
        
        /** Converts a Tuple2 into an Edge */
-       public static class TupleEdgeConverter extends 
MapFunction<Tuple2<Integer, Integer>, Edge> {
+       public static class TupleEdgeConverter implements 
MapFunction<Tuple2<Integer, Integer>, Edge> {
                private final Edge outEdge = new Edge();
                
                @Override
@@ -145,7 +145,7 @@ public class EnumTrianglesOpt {
        }
        
        /** Emits for an edge the original edge and its switched version. */
-       private static class EdgeDuplicator extends FlatMapFunction<Edge, Edge> 
{
+       private static class EdgeDuplicator implements FlatMapFunction<Edge, 
Edge> {
                
                @Override
                public void flatMap(Edge edge, Collector<Edge> out) throws 
Exception {
@@ -160,7 +160,7 @@ public class EnumTrianglesOpt {
         * Emits one edge for each input edge with a degree annotation for the 
shared vertex.
         * For each emitted edge, the first vertex is the vertex with the 
smaller id.
         */
-       private static class DegreeCounter extends GroupReduceFunction<Edge, 
EdgeWithDegrees> {
+       private static class DegreeCounter implements GroupReduceFunction<Edge, 
EdgeWithDegrees> {
                
                final ArrayList<Integer> otherVertices = new 
ArrayList<Integer>();
                final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
@@ -208,7 +208,7 @@ public class EnumTrianglesOpt {
         * Builds an edge with degree annotation from two edges that have the 
same vertices and only one 
         * degree annotation.
         */
-       private static class DegreeJoiner extends 
ReduceFunction<EdgeWithDegrees> {
+       private static class DegreeJoiner implements 
ReduceFunction<EdgeWithDegrees> {
                private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
                
                @Override
@@ -228,7 +228,7 @@ public class EnumTrianglesOpt {
        }
                
        /** Projects an edge (pair of vertices) such that the first vertex is 
the vertex with the smaller degree. */
-       private static class EdgeByDegreeProjector extends 
MapFunction<EdgeWithDegrees, Edge> {
+       private static class EdgeByDegreeProjector implements 
MapFunction<EdgeWithDegrees, Edge> {
                
                private final Edge outEdge = new Edge();
                
@@ -249,7 +249,7 @@ public class EnumTrianglesOpt {
        }
        
        /** Projects an edge (pair of vertices) such that the id of the first 
is smaller than the id of the second. */
-       private static class EdgeByIdProjector extends MapFunction<Edge, Edge> {
+       private static class EdgeByIdProjector implements MapFunction<Edge, 
Edge> {
        
                @Override
                public Edge map(Edge inEdge) throws Exception {
@@ -268,7 +268,7 @@ public class EnumTrianglesOpt {
         *  The first vertex of a triad is the shared vertex, the second and 
third vertex are ordered by vertexId. 
         *  Assumes that input edges share the first vertex and are in 
ascending order of the second vertex.
         */
-       private static class TriadBuilder extends GroupReduceFunction<Edge, 
Triad> {
+       private static class TriadBuilder implements GroupReduceFunction<Edge, 
Triad> {
                
                private final List<Integer> vertices = new ArrayList<Integer>();
                private final Triad outTriad = new Triad();
@@ -300,7 +300,7 @@ public class EnumTrianglesOpt {
        }
        
        /** Filters triads (three vertices connected by two edges) without a 
closing third edge. */
-       private static class TriadFilter extends JoinFunction<Triad, Edge, 
Triad> {
+       private static class TriadFilter implements JoinFunction<Triad, Edge, 
Triad> {
                
                @Override
                public Triad join(Triad triad, Edge edge) throws Exception {
@@ -332,7 +332,7 @@ public class EnumTrianglesOpt {
                        System.out.println("Executing Enum Triangles Opt 
example with built-in default data.");
                        System.out.println("  Provide parameters to read input 
data from files.");
                        System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: EnumTriangleBasic <edge 
path> <result path>");
+                       System.out.println("  Usage: EnumTriangleOpt <edge 
path> <result path>");
                }
                return true;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
index 18eba5d..ba9754f 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
@@ -23,10 +23,10 @@ import static 
org.apache.flink.api.java.aggregation.Aggregations.SUM;
 import java.util.ArrayList;
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -139,7 +139,7 @@ public class PageRankBasic {
        /** 
         * A map function that assigns an initial rank to all pages. 
         */
-       public static final class RankAssigner extends 
MapFunction<Tuple1<Long>, Tuple2<Long, Double>> {
+       public static final class RankAssigner implements 
MapFunction<Tuple1<Long>, Tuple2<Long, Double>> {
                Tuple2<Long, Double> outPageWithRank;
                
                public RankAssigner(double rank) {
@@ -158,7 +158,7 @@ public class PageRankBasic {
         * originate. Run as a pre-processing step.
         */
        @ConstantFields("0")
-       public static final class BuildOutgoingEdgeList extends 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
+       public static final class BuildOutgoingEdgeList implements 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
                
                private final ArrayList<Long> neighbors = new ArrayList<Long>();
                
@@ -179,7 +179,7 @@ public class PageRankBasic {
        /**
         * Join function that distributes a fraction of a vertex's rank to all 
neighbors.
         */
-       public static final class JoinVertexWithEdgesMatch extends 
FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, 
Tuple2<Long, Double>> {
+       public static final class JoinVertexWithEdgesMatch implements 
FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, 
Tuple2<Long, Double>> {
 
                @Override
                public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, 
Long[]>> value, Collector<Tuple2<Long, Double>> out){
@@ -197,7 +197,7 @@ public class PageRankBasic {
         * The function that applies the page rank dampening formula
         */
        @ConstantFields("0")
-       public static final class Dampener extends 
MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
+       public static final class Dampener implements 
MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
 
                private final double dampening;
                private final double randomJump;
@@ -217,7 +217,7 @@ public class PageRankBasic {
        /**
         * Filter that filters vertices where the rank difference is below a 
threshold.
         */
-       public static final class EpsilonFilter extends 
FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
+       public static final class EpsilonFilter implements 
FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
 
                @Override
                public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, 
Double>> value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
index d8d8b62..22054da 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.example.java.graph;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
@@ -26,8 +28,6 @@ import org.apache.flink.api.java.IterativeDataSet;
 import java.util.Iterator;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.example.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
index 1d687f3..0868732 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
@@ -22,8 +22,9 @@ package org.apache.flink.example.java.ml;
 import java.io.Serializable;
 import java.util.Collection;
 
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.example.java.ml.util.LinearRegressionData;
@@ -183,7 +184,7 @@ public class LinearRegression {
        // 
*************************************************************************
 
        /** Converts a Tuple2<Double,Double> into a Data. */
-       public static final class TupleDataConverter extends 
MapFunction<Tuple2<Double, Double>, Data> {
+       public static final class TupleDataConverter implements 
MapFunction<Tuple2<Double, Double>, Data> {
 
                @Override
                public Data map(Tuple2<Double, Double> t) throws Exception {
@@ -192,7 +193,7 @@ public class LinearRegression {
        }
 
        /** Converts a Tuple2<Double,Double> into a Params. */
-       public static final class TupleParamsConverter extends 
MapFunction<Tuple2<Double, Double>,Params> {
+       public static final class TupleParamsConverter implements 
MapFunction<Tuple2<Double, Double>,Params> {
 
                @Override
                public Params map(Tuple2<Double, Double> t)throws Exception {
@@ -203,7 +204,7 @@ public class LinearRegression {
        /**
         * Compute a single BGD type update for every parameters.
         */
-       public static class SubUpdate extends 
MapFunction<Data,Tuple2<Params,Integer>>{
+       public static class SubUpdate extends 
RichMapFunction<Data,Tuple2<Params,Integer>> {
 
                private Collection<Params> parameters; 
 
@@ -234,7 +235,7 @@ public class LinearRegression {
        /**  
         * Accumulator all the update.
         * */
-       public static class UpdateAccumulator extends 
ReduceFunction<Tuple2<Params, Integer>> {
+       public static class UpdateAccumulator implements 
ReduceFunction<Tuple2<Params, Integer>> {
 
                @Override
                public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> 
val1, Tuple2<Params, Integer> val2) {
@@ -250,7 +251,7 @@ public class LinearRegression {
        /**
         * Compute the final update by average them.
         */
-       public static class Update extends MapFunction<Tuple2<Params, 
Integer>,Params>{
+       public static class Update implements MapFunction<Tuple2<Params, 
Integer>,Params> {
 
                @Override
                public Params map(Tuple2<Params, Integer> arg0) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
index 61b90dd..4bced17 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.RichFilterFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -149,7 +149,7 @@ public class EmptyFieldsCountAccumulator {
         * In doing so, it also counts the number of empty fields per attribute 
with an accumulator (registered under 
         * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
         */
-       public static final class EmptyFieldFilter extends 
FilterFunction<Tuple> {
+       public static final class EmptyFieldFilter extends 
RichFilterFunction<Tuple> {
 
                // create a new accumulator in each filter function instance
                // accumulators can be merged later on

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
index 48cdedc..08a261c 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.example.java.relational;
 
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FilterFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
index ef03e6f..1ff6583 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.example.java.relational;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
index 52109ea..4544fd4 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
@@ -25,9 +25,9 @@ import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
 
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
index 2649d24..3033c0d 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
@@ -21,8 +21,8 @@ package org.apache.flink.example.java.relational;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -159,7 +159,7 @@ public class WebLogAnalysis {
         * MapFunction that filters for documents that contain a certain set of
         * keywords.
         */
-       public static class FilterDocByKeyWords extends 
FilterFunction<Tuple2<String, String>> {
+       public static class FilterDocByKeyWords implements 
FilterFunction<Tuple2<String, String>> {
 
                private static final String[] KEYWORDS = { " editors ", " 
oscillations " };
 
@@ -187,7 +187,7 @@ public class WebLogAnalysis {
        /**
         * MapFunction that filters for records where the rank exceeds a 
certain threshold.
         */
-       public static class FilterByRank extends FilterFunction<Tuple3<Integer, 
String, Integer>> {
+       public static class FilterByRank implements 
FilterFunction<Tuple3<Integer, String, Integer>> {
 
                private static final int RANKFILTER = 40;
 
@@ -210,7 +210,7 @@ public class WebLogAnalysis {
         * MapFunction that filters for records of the visits relation where 
the year
         * (from the date string) is equal to a certain value.
         */
-       public static class FilterVisitsByDate extends 
FilterFunction<Tuple2<String, String>> {
+       public static class FilterVisitsByDate implements 
FilterFunction<Tuple2<String, String>> {
 
                private static final int YEARFILTER = 2007;
 
@@ -237,7 +237,7 @@ public class WebLogAnalysis {
         * If the first input does not provide any pairs, all pairs of the 
second input are emitted.
         * Otherwise, no pair is emitted.
         */
-       public static class AntiJoinVisits extends 
CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, 
Tuple3<Integer, String, Integer>> {
+       public static class AntiJoinVisits implements 
CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, 
Tuple3<Integer, String, Integer>> {
 
                /**
                 * If the visit iterator is empty, all pairs of the rank 
iterator are emitted.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java
index a18abcb..3e95ccd 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.example.java.wordcount;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
@@ -91,7 +91,7 @@ public class WordCount {
         * FlatMapFunction. The function takes a line (String) and splits it 
into 
         * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
         */
-       public static final class Tokenizer extends FlatMapFunction<String, 
Tuple2<String, Integer>> {
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
 
                @Override
                public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
index 32a8997..04810a1 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.example.java.wordcount;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -111,7 +111,7 @@ public class WordCountPOJO {
         * FlatMapFunction. The function takes a line (String) and splits it 
into
         * multiple WC POJOs as "(word, 1)".
         */
-       public static final class Tokenizer extends FlatMapFunction<String, WC> 
{
+       public static final class Tokenizer implements FlatMapFunction<String, 
WC> {
 
                @Override
                public void flatMap(String value, Collector<WC> out) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml
index 917461a..ae46681 100644
--- a/flink-examples/pom.xml
+++ b/flink-examples/pom.xml
@@ -50,5 +50,4 @@ under the License.
                <module>flink-java-examples</module>
                <module>flink-scala-examples</module>
        </modules>
-
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 894880e..e7199f9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -19,42 +19,39 @@
 package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
 import org.apache.flink.api.java.operators.CrossOperator;
+import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.operators.FilterOperator;
 import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
+import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.operators.MapOperator;
-import org.apache.flink.api.java.operators.ProjectOperator;
-import org.apache.flink.api.java.operators.ReduceGroupOperator;
+import org.apache.flink.api.java.operators.ProjectOperator.Projection;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
-import org.apache.flink.api.java.operators.SortedGrouping;
 import org.apache.flink.api.java.operators.UnionOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets;
-import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
-import org.apache.flink.api.java.operators.JoinOperator.JoinHint;
-import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
-import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -68,8 +65,8 @@ import org.apache.flink.types.TypeInformation;
  * A DataSet represents a collection of elements of the same type.<br/>
  * A DataSet can be transformed into another DataSet by applying a 
transformation as for example 
  * <ul>
- *   <li>{@link DataSet#map(MapFunction)},</li>
- *   <li>{@link DataSet#reduce(ReduceFunction)},</li>
+ *   <li>{@link 
DataSet#map(org.apache.flink.api.java.functions.RichMapFunction)},</li>
+ *   <li>{@link 
DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
  *   <li>{@link DataSet#join(DataSet)}, or</li>
  *   <li>{@link DataSet#coGroup(DataSet)}.</li>
  * </ul>
@@ -124,13 +121,13 @@ public abstract class DataSet<T> {
        
        /**
         * Applies a Map transformation on a {@link DataSet}.<br/>
-        * The transformation calls a {@link MapFunction} for each element of 
the DataSet.
+        * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichMapFunction} for each element of the 
DataSet.
         * Each MapFunction call returns exactly one element.
         * 
         * @param mapper The MapFunction that is called for each element of the 
DataSet.
         * @return A MapOperator that represents the transformed DataSet.
         * 
-        * @see MapFunction
+        * @see org.apache.flink.api.java.functions.RichMapFunction
         * @see MapOperator
         * @see DataSet
         */
@@ -138,18 +135,21 @@ public abstract class DataSet<T> {
                if (mapper == null) {
                        throw new NullPointerException("Map function must not 
be null.");
                }
+               if (FunctionUtils.isSerializedLambdaFunction(mapper)) {
+                       throw new UnsupportedLambdaExpressionException();
+               }
                return new MapOperator<T, R>(this, mapper);
        }
        
        /**
         * Applies a FlatMap transformation on a {@link DataSet}.<br/>
-        * The transformation calls a {@link FlatMapFunction} for each element 
of the DataSet.
+        * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichFlatMapFunction} for each element of 
the DataSet.
         * Each FlatMapFunction call can return any number of elements 
including none.
         * 
         * @param flatMapper The FlatMapFunction that is called for each 
element of the DataSet. 
         * @return A FlatMapOperator that represents the transformed DataSet.
         * 
-        * @see FlatMapFunction
+        * @see org.apache.flink.api.java.functions.RichFlatMapFunction
         * @see FlatMapOperator
         * @see DataSet
         */
@@ -157,19 +157,22 @@ public abstract class DataSet<T> {
                if (flatMapper == null) {
                        throw new NullPointerException("FlatMap function must 
not be null.");
                }
+               if (FunctionUtils.isSerializedLambdaFunction(flatMapper)) {
+                       throw new UnsupportedLambdaExpressionException();
+               }
                return new FlatMapOperator<T, R>(this, flatMapper);
        }
        
        /**
         * Applies a Filter transformation on a {@link DataSet}.<br/>
-        * The transformation calls a {@link FilterFunction} for each element 
of the DataSet 
+        * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichFilterFunction} for each element of the 
DataSet
         * and retains only those element for which the function returns true. 
Elements for 
         * which the function returns false are filtered. 
         * 
         * @param filter The FilterFunction that is called for each element of 
the DataSet.
         * @return A FilterOperator that represents the filtered DataSet.
         * 
-        * @see FilterFunction
+        * @see org.apache.flink.api.java.functions.RichFilterFunction
         * @see FilterOperator
         * @see DataSet
         */
@@ -179,6 +182,7 @@ public abstract class DataSet<T> {
                }
                return new FilterOperator<T>(this, filter);
        }
+
        
        // 
--------------------------------------------------------------------------------------------
        //  Projections
@@ -264,14 +268,14 @@ public abstract class DataSet<T> {
        
        /**
         * Applies a Reduce transformation on a non-grouped {@link 
DataSet}.<br/>
-        * The transformation consecutively calls a {@link ReduceFunction} 
+        * The transformation consecutively calls a {@link 
org.apache.flink.api.java.functions.RichReduceFunction}
         *   until only a single element remains which is the result of the 
transformation.
         * A ReduceFunction combines two elements into one new element of the 
same type.
         * 
         * @param reducer The ReduceFunction that is applied on the DataSet.
         * @return A ReduceOperator that represents the reduced DataSet.
         * 
-        * @see ReduceFunction
+        * @see org.apache.flink.api.java.functions.RichReduceFunction
         * @see ReduceOperator
         * @see DataSet
         */
@@ -284,24 +288,27 @@ public abstract class DataSet<T> {
        
        /**
         * Applies a GroupReduce transformation on a non-grouped {@link 
DataSet}.<br/>
-        * The transformation calls a {@link GroupReduceFunction} once with the 
full DataSet.
+        * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichGroupReduceFunction} once with the full 
DataSet.
         * The GroupReduceFunction can iterate over all elements of the DataSet 
and emit any
         *   number of output elements including none.
         * 
         * @param reducer The GroupReduceFunction that is applied on the 
DataSet.
         * @return A GroupReduceOperator that represents the reduced DataSet.
         * 
-        * @see GroupReduceFunction
-        * @see ReduceGroupOperator
+        * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+        * @see org.apache.flink.api.java.operators.GroupReduceOperator
         * @see DataSet
         */
-       public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, 
R> reducer) {
+       public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, 
R> reducer) {
                if (reducer == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }
-               return new ReduceGroupOperator<T, R>(this, reducer);
+               if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+                       throw new UnsupportedLambdaExpressionException();
+               }
+               return new GroupReduceOperator<T, R>(this, reducer);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  distinct
        // 
--------------------------------------------------------------------------------------------
@@ -359,8 +366,8 @@ public abstract class DataSet<T> {
         * <ul>
         *   <li>{@link UnsortedGrouping#sortGroup(int, 
org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
         *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply 
an Aggregate transformation.
-        *   <li>{@link UnsortedGrouping#reduce(ReduceFunction)} to apply a 
Reduce transformation.
-        *   <li>{@link UnsortedGrouping#reduceGroup(GroupReduceFunction)} to 
apply a GroupReduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)}
 to apply a Reduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}
 to apply a GroupReduce transformation.
         * </ul>
         *  
         * @param keyExtractor The KeySelector function which extracts the key 
values from the DataSet on which it is grouped. 
@@ -372,7 +379,7 @@ public abstract class DataSet<T> {
         * @see SortedGrouping
         * @see AggregateOperator
         * @see ReduceOperator
-        * @see ReduceGroupOperator
+        * @see org.apache.flink.api.java.operators.GroupReduceOperator
         * @see DataSet
         */
        public <K extends Comparable<K>> UnsortedGrouping<T> 
groupBy(KeySelector<T, K> keyExtractor) {
@@ -388,8 +395,8 @@ public abstract class DataSet<T> {
         * <ul>
         *   <li>{@link UnsortedGrouping#sortGroup(int, 
org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
         *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply 
an Aggregate transformation.
-        *   <li>{@link UnsortedGrouping#reduce(ReduceFunction)} to apply a 
Reduce transformation.
-        *   <li>{@link UnsortedGrouping#reduceGroup(GroupReduceFunction)} to 
apply a GroupReduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)}
 to apply a Reduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}
 to apply a GroupReduce transformation.
         * </ul> 
         * 
         * @param fields One or more field positions on which the DataSet will 
be grouped. 
@@ -401,7 +408,7 @@ public abstract class DataSet<T> {
         * @see SortedGrouping
         * @see AggregateOperator
         * @see ReduceOperator
-        * @see ReduceGroupOperator
+        * @see org.apache.flink.api.java.operators.GroupReduceOperator
         * @see DataSet
         */
        public UnsortedGrouping<T> groupBy(int... fields) {
@@ -417,8 +424,8 @@ public abstract class DataSet<T> {
         * <ul>
         *   <li>{@link UnsortedGrouping#sortGroup(int, 
org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
         *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply 
an Aggregate transformation.
-        *   <li>{@link UnsortedGrouping#reduce(ReduceFunction)} to apply a 
Reduce transformation.
-        *   <li>{@link UnsortedGrouping#reduceGroup(GroupReduceFunction)} to 
apply a GroupReduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)}
 to apply a Reduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}
 to apply a GroupReduce transformation.
         * </ul>
         *
         * @param fields One or more field expressions on which the DataSet 
will be grouped.
@@ -430,7 +437,7 @@ public abstract class DataSet<T> {
         * @see SortedGrouping
         * @see AggregateOperator
         * @see ReduceOperator
-        * @see ReduceGroupOperator
+        * @see org.apache.flink.api.java.operators.GroupReduceOperator
         * @see DataSet
         */
        public UnsortedGrouping<T> groupBy(String... fields) {
@@ -461,7 +468,7 @@ public abstract class DataSet<T> {
        public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
                return new JoinOperatorSets<T, R>(this, other);
        }
-       
+
        /**
         * Initiates a Join transformation. <br/>
         * A Join transformation joins the elements of two 
@@ -514,7 +521,7 @@ public abstract class DataSet<T> {
         * Initiates a CoGroup transformation.<br/>
         * A CoGroup transformation combines the elements of
         *   two {@link DataSet DataSets} into one DataSet. It groups each 
DataSet individually on a key and 
-        *   gives groups of both DataSets with equal keys together into a 
{@link CoGroupFunction}.
+        *   gives groups of both DataSets with equal keys together into a 
{@link org.apache.flink.api.java.functions.RichCoGroupFunction}.
         *   If a DataSet has a group with no matching key in the other 
DataSet, the CoGroupFunction
         *   is called with an empty group for the non-existing group.</br>
         * The CoGroupFunction can iterate over the elements of both groups and 
return any number 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java
index bb53a89..2f2eae0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DeltaIteration.java
@@ -189,7 +189,7 @@ public class DeltaIteration<ST, WT> {
         * The value of an aggregator can be accessed in the next iteration.
         * <p>
         * Aggregators can be accessed inside a function via the
-        * {@link 
org.apache.flink.api.common.functions.AbstractFunction#getIterationRuntimeContext()}
 method.
+        * {@link 
org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()}
 method.
         * 
         * @param name The name under which the aggregator is registered.
         * @param aggregator The aggregator class.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index d00fb47..ebd1422 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -112,7 +112,7 @@ public abstract class ExecutionEnvironment {
         * individually override this value to use a specific degree of 
parallelism via
         * {@link Operator#setParallelism(int)}. Other operations may need to 
run with a different
         * degree of parallelism - for example calling
-        * {@link 
DataSet#reduce(org.apache.flink.api.java.functions.ReduceFunction)} over the 
entire
+        * {@link 
DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} over 
the entire
         * set will insert eventually an operation that runs non-parallel 
(degree of parallelism of one).
         * 
         * @return The degree of parallelism used by operations, unless they 
override that value. This method
@@ -550,7 +550,7 @@ public abstract class ExecutionEnvironment {
         * The runtime will copy the files temporarily to a local cache, if 
needed.
         * <p>
         * The {@link org.apache.flink.api.common.functions.RuntimeContext} can 
be obtained inside UDFs via
-        * {@link 
org.apache.flink.api.common.functions.Function#getRuntimeContext()} and 
provides access 
+        * {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
         * {@link org.apache.flink.api.common.cache.DistributedCache} via 
         * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
         * 
@@ -568,7 +568,7 @@ public abstract class ExecutionEnvironment {
         * The runtime will copy the files temporarily to a local cache, if 
needed.
         * <p>
         * The {@link org.apache.flink.api.common.functions.RuntimeContext} can 
be obtained inside UDFs via
-        * {@link 
org.apache.flink.api.common.functions.Function#getRuntimeContext()} and 
provides access 
+        * {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
         * {@link org.apache.flink.api.common.cache.DistributedCache} via 
         * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
         * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java
index 6443dbb..5dc0d2e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/IterativeDataSet.java
@@ -93,7 +93,7 @@ public class IterativeDataSet<T> extends 
SingleInputOperator<T, T, IterativeData
         * The value of an aggregator can be accessed in the next iteration.
         * <p>
         * Aggregators can be accessed inside a function via the
-        * {@link 
org.apache.flink.api.common.functions.AbstractFunction#getIterationRuntimeContext()}
 method.
+        * {@link 
org.apache.flink.api.common.functions.AbstractRichFunction#getIterationRuntimeContext()}
 method.
         * 
         * @param name The name under which the aggregator is registered.
         * @param aggregator The aggregator class.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java
deleted file mode 100644
index 201794a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/CoGroupFunction.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.util.Collector;
-
-/**
- * The abstract base class for CoGroup functions. CoGroup functions combine 
two data sets by first grouping each data set
- * after a key and then "joining" the groups by calling this function with the 
two sets for each key. 
- * If a key is present in only one of the two inputs, it may be that one of 
the groups is empty.
- * <p>
- * The basic syntax for using CoGoup on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * 
set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyCoGroupFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second 
input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * Some keys may only be contained in one of the two original data sets. In 
that case, the CoGroup function is invoked
- * with in empty input for the side of the data set that did not contain 
elements with that specific key.
- * <p>
- * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
- * 
- * @param <IN1> The type of the elements in the first input.
- * @param <IN2> The type of the elements in the second input.
- * @param <OUT> The type of the result elements.
- */
-public abstract class CoGroupFunction<IN1, IN2, OUT> extends AbstractFunction 
implements GenericCoGrouper<IN1, IN2, OUT> {
-
-       private static final long serialVersionUID = 1L;
-       
-       
-       /**
-        * The core method of the CoGroupFunction. This method is called for 
each pair of groups that have the same
-        * key. The elements of the groups are returned by the respective 
iterators.
-        * 
-        * It is possible that one of the two groups is empty, in which case 
the respective iterator has no elements.
-        * 
-        * @param first The group from the first input.
-        * @param second The group from the second input.
-        * @param out The collector through which to return the result elements.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, 
Collector<OUT> out) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java
deleted file mode 100644
index 27907ec..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/CrossFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.util.Collector;
-
-
-/**
- * The abstract base class for Cross functions. Cross functions build a 
Cartesian produce of their inputs
- * and call the function or each pair of elements.
- * They are a means of convenience and can be used to directly produce 
manipulate the
- * pair of elements, instead of having the operator build 2-tuples, and then 
using a
- * MapFunction over those 2-tuples.
- * <p>
- * The basic syntax for using Cross on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.cross(set2).with(new MyCrossFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second 
input.
- * <p>
- * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
- * 
- * @param <IN1> The type of the elements in the first input.
- * @param <IN2> The type of the elements in the second input.
- * @param <OUT> The type of the result elements.
- */
-public abstract class CrossFunction<IN1, IN2, OUT> extends AbstractFunction 
implements GenericCrosser<IN1, IN2, OUT>{
-       
-       private static final long serialVersionUID = 1L;
-       
-
-       /**
-        * The core method of the cross operation. The method will be invoked 
for each pair of elements
-        * in the Cartesian product.
-        * 
-        * @param first The element from the first input.
-        * @param second The element from the second input.
-        * @return The result element.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       public abstract OUT cross(IN1 first, IN2 second) throws Exception;
-       
-       
-       
-       /**
-        * This method only delegates calls to the {@link #cross(Object, 
Object)} method.
-        */
-       @Override
-       public final void cross(IN1 record1, IN2 record2, Collector<OUT> out) 
throws Exception {
-               out.collect(cross(record1, record2));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java
deleted file mode 100644
index aac2086..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FilterFunction.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericFilter;
-
-/**
- * The abstract base class for Filter functions. A filter function take 
elements and evaluates a
- * predicate on them to decide whether to keep the element, or to discard it.
- * <p>
- * The basic syntax for using a FilterFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.filter(new MyFilterFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FilterFunction needs to be serializable, as defined 
in {@link java.io.Serializable}.
- * 
- * @param <T> The type of the filtered elements.
- */
-public abstract class FilterFunction<T> extends AbstractFunction implements 
GenericFilter<T> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * The core method of the FilterFunction. The method is called for each 
element in the input,
-        * and determines whether the element should be kept or filtered out. 
If the method returns true,
-        * the element passes the filter and is kept, if the method returns 
false, the element is
-        * filtered out.
-        * 
-        * @param value The input value to be filtered.
-        * @return Flag to indicate whether to keep the value (true) or to 
discard it (false).
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public abstract boolean filter(T value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java
deleted file mode 100644
index f9c22cc..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapFunction.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericFlatMap;
-import org.apache.flink.util.Collector;
-
-/**
- * The abstract base class for flatMap functions. FlatMap functions take 
elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
- * and arrays. Operations that produce multiple strictly one result element 
per input element can also
- * use the {@link MapFunction}.
- * <p>
- * The basic syntax for using a FlatMapFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FlatMapFunction needs to be serializable, as 
defined in {@link java.io.Serializable}.
- * 
- * @param <IN> Type of the input elements.
- * @param <OUT> Type of the returned elements.
- */
-public abstract class FlatMapFunction<IN, OUT> extends AbstractFunction 
implements GenericFlatMap<IN, OUT> {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * The core method of the FlatMapFunction. Takes an element from the 
input data set and transforms
-        * it into zero, one, or more elements.
-        * 
-        * @param value The input value.
-        * @param out The collector for for emitting result values.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public abstract void flatMap(IN value, Collector<OUT> out) throws 
Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 5cc8c12..012ab57 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 /**
- * A variant of the {@link FlatMapFunction} that returns elements through an 
iterator, rather then
+ * A variant of the {@link RichFlatMapFunction} that returns elements through 
an iterator, rather then
  * through a collector. In all other respects, it behaves exactly like the 
FlatMapFunction.
  * <p>
  * The function needs to be serializable, as defined in {@link 
java.io.Serializable}.
@@ -31,7 +31,7 @@ import org.apache.flink.util.Collector;
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */
-public abstract class FlatMapIterator<IN, OUT> extends FlatMapFunction<IN, 
OUT> {
+public abstract class FlatMapIterator<IN, OUT> extends RichFlatMapFunction<IN, 
OUT> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index 1015971..b94840f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -64,15 +64,15 @@ import org.apache.flink.api.common.InvalidProgramException;
  * </b>
  * <p>
  * Be aware that some annotations should only be used for functions with as 
single input
- * ({@link MapFunction}, {@link ReduceFunction}) and some only for stubs with 
two inputs
- * ({@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}).
+ * ({@link RichMapFunction}, {@link RichReduceFunction}) and some only for 
stubs with two inputs
+ * ({@link RichCrossFunction}, {@link RichFlatJoinFunction}, {@link 
RichCoGroupFunction}).
  */
 public class FunctionAnnotation {
 
        /**
         * This annotation declares that a function leaves certain fields of 
its input values unmodified and
         * only "forwards" or "copies" them to the return value. The annotation 
is applicable to unary
-        * functions, like for example {@link MapFunction}, {@link 
ReduceFunction}, or {@link FlatMapFunction}.
+        * functions, like for example {@link RichMapFunction}, {@link 
RichReduceFunction}, or {@link RichFlatMapFunction}.
         * <p>
         * The following example illustrates a function that keeps the tuple's 
field zero constant:
         * <pre><blockquote>
@@ -103,7 +103,7 @@ public class FunctionAnnotation {
        /**
         * This annotation declares that a function leaves certain fields of 
its first input values unmodified and
         * only "forwards" or "copies" them to the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link JoinFunction}, {@link 
CoGroupFunction}, or {@link CrossFunction}.
+        * functions, like for example {@link RichFlatJoinFunction}, {@link 
RichCoGroupFunction}, or {@link RichCrossFunction}.
         * <p>
         * The following example illustrates a join function that copies fields 
from the first and second input to the
         * return value:
@@ -135,7 +135,7 @@ public class FunctionAnnotation {
        /**
         * This annotation declares that a function leaves certain fields of 
its second input values unmodified and
         * only "forwards" or "copies" them to the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link JoinFunction}, {@link 
CoGroupFunction}, or {@link CrossFunction}.
+        * functions, like for example {@link RichFlatJoinFunction}, {@link 
RichCoGroupFunction}, or {@link RichCrossFunction}.
         * <p>
         * The following example illustrates a join function that copies fields 
from the first and second input to the
         * return value:
@@ -167,7 +167,7 @@ public class FunctionAnnotation {
        /**
         * This annotation declares that a function changes certain fields of 
its input values, while leaving all
         * others unmodified and in place in the return value. The annotation 
is applicable to unary
-        * functions, like for example {@link MapFunction}, {@link 
ReduceFunction}, or {@link FlatMapFunction}.
+        * functions, like for example {@link RichMapFunction}, {@link 
RichReduceFunction}, or {@link RichFlatMapFunction}.
         * <p>
         * The following example illustrates that at the example of a Map 
function:
         * 
@@ -201,7 +201,7 @@ public class FunctionAnnotation {
        /**
         * This annotation declares that a function changes certain fields of 
its first input value, while leaving all
         * others unmodified and in place in the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link JoinFunction}, {@link 
CoGroupFunction}, or {@link CrossFunction}.
+        * functions, like for example {@link RichFlatJoinFunction}, {@link 
RichCoGroupFunction}, or {@link RichCrossFunction}.
         * <p>
         * The following example illustrates a join function that copies fields 
from the first and second input to the
         * return value:
@@ -238,7 +238,7 @@ public class FunctionAnnotation {
        /**
         * This annotation declares that a function changes certain fields of 
its second input value, while leaving all
         * others unmodified and in place in the return value. The annotation 
is applicable to binary
-        * functions, like for example {@link JoinFunction}, {@link 
CoGroupFunction}, or {@link CrossFunction}.
+        * functions, like for example {@link RichFlatJoinFunction}, {@link 
RichCoGroupFunction}, or {@link RichCrossFunction}.
         * <p>
         * The following example illustrates a join function that copies fields 
from the first and second input to the
         * return value:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java
deleted file mode 100644
index 01ae9c1..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceFunction.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.util.Collector;
-
-/**
- * The abstract base class for group reduce functions. Group reduce functions 
process groups of elements.
- * They may aggregate them to a single value, or produce multiple result 
values for each group.
- * <p>
- * For a reduce functions that works incrementally by combining always two 
elements, see 
- * {@link ReduceFunction}, called via {@link 
org.apache.flink.api.java.DataSet#reduce(ReduceFunction)}.
- * <p>
- * The basic syntax for using a grouped GroupReduceFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new 
MyGroupReduceFunction());
- * </blockquote></pre>
- * <p>
- * GroupReduceFunctions may be "combinable", in which case they can pre-reduce 
partial groups in order to
- * reduce the data volume early. See the {@link #combine(Iterator, Collector)} 
function for details.
- * <p>
- * Like all functions, the GroupReduceFunction needs to be serializable, as 
defined in {@link java.io.Serializable}.
- * 
- * @param <IN> Type of the elements that this function processes.
- * @param <OUT> The type of the elements returned by the user-defined function.
- */
-public abstract class GroupReduceFunction<IN, OUT> extends AbstractFunction 
implements GenericGroupReduce<IN, OUT>, GenericCombine<IN> {
-       
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Core method of the reduce function. It is called one per group of 
elements. If the reducer
-        * is not grouped, than the entire data set is considered one group.
-        * 
-        * @param values The iterator returning the group of values to be 
reduced.
-        * @param out The collector to emit the returned values.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public abstract void reduce(Iterator<IN> values, Collector<OUT> out) 
throws Exception;
-       
-       /**
-        * The combine methods pre-reduces elements. It may be called on 
subsets of the data
-        * before the actual reduce function. This is often helpful to lower 
data volume prior
-        * to reorganizing the data in an expensive way, as might be required 
for the final
-        * reduce function.
-        * <p>
-        * This method is only ever invoked when the subclass of {@link 
GroupReduceFunction}
-        * adds the {@link Combinable} annotation, or if the <i>combinable</i> 
flag is set when defining
-        * the <i>reduceGroup<i> operation via
-        * {@link 
org.apache.flink.api.java.operators.ReduceGroupOperator#setCombinable(boolean)}.
-        * <p>
-        * Since the reduce function will be called on the result of this 
method, it is important that this
-        * method returns the same data type as it consumes. By default, this 
method only calls the
-        * {@link #reduce(Iterator, Collector)} method. If the behavior in the 
pre-reducing is different
-        * from the final reduce function (for example because the reduce 
function changes the data type),
-        * this method must be overwritten, or the execution will fail.
-        * 
-        * @param values The iterator returning the group of values to be 
reduced.
-        * @param out The collector to emit the returned values.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public void combine(Iterator<IN> values, Collector<IN> out) throws 
Exception {
-               @SuppressWarnings("unchecked")
-               Collector<OUT> c = (Collector<OUT>) out;
-               reduce(values, c);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * This annotation can be added to classes that extend {@link 
GroupReduceFunction}, in oder to mark
-        * them as "combinable". The system may call the {@link 
GroupReduceFunction#combine(Iterator, Collector)}
-        * method on such functions, to pre-reduce the data before transferring 
it over the network to
-        * the actual group reduce operation.
-        * <p>
-        * Marking combinable functions as such is in general beneficial for 
performance.
-        */
-       @Retention(RetentionPolicy.RUNTIME)
-       @Target(ElementType.TYPE)
-       public static @interface Combinable {};
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
index 6cb397b..b363606 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 
-public abstract class GroupReduceIterator<IN, OUT> extends 
GroupReduceFunction<IN, OUT> {
+public abstract class GroupReduceIterator<IN, OUT> extends 
RichGroupReduceFunction<IN, OUT> {
        
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java
deleted file mode 100644
index c78e6f3..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/JoinFunction.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.util.Collector;
-
-/**
- * The abstract base class for Join functions. Join functions combine two data 
sets by joining their
- * elements on specified keys and calling this function with each pair of 
joining elements.
- * By default, this follows strictly the semantics of an "inner join" in SQL.
- * the semantics are those of an "inner join", meaning that elements are 
filtered out
- * if their key is not contained in the other data set.
- * <p>
- * Per the semantics of an inner join, the function is 
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyJoinFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second 
input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * The Join function is actually not a necessary part of a join operation. If 
no JoinFunction is provided,
- * the result of the operation is a sequence of Tuple2, where the elements in 
the tuple are those that
- * the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link CoGroupFunction} to perform an outer join.
- * <p>
- * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
- * 
- * @param <IN1> The type of the elements in the first input.
- * @param <IN2> The type of the elements in the second input.
- * @param <OUT> The type of the result elements.
- */
-public abstract class JoinFunction<IN1, IN2, OUT> extends AbstractFunction 
implements GenericJoiner<IN1, IN2, OUT> {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * The user-defined method for performing transformations after a join.
-        * The method is called with matching pairs of elements from the inputs.
-        * 
-        * @param first The element from first input.
-        * @param second The element from second input.
-        * @return The resulting element.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       public abstract OUT join(IN1 first, IN2 second) throws Exception;
-       
-       
-       /**
-        * The user-defined method for performing transformations after a join, 
for operations that
-        * produce zero elements, or more than one element.
-        * By default, this method delegates to the method {@link #join(Object, 
Object)}. If this method
-        * is overridden, that method will no longer be called.
-        * 
-        * @param value1 The element from first input.
-        * @param value2 The element from second input.
-        * @param out A collector to emit resulting element (zero, one, or 
many).
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public void join(IN1 value1, IN2 value2, Collector<OUT> out) throws 
Exception {
-               out.collect(join(value1, value2));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java
deleted file mode 100644
index 64aec2a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/MapFunction.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericMap;
-
-/**
- * The abstract base class for Map functions. Map functions take elements and 
transform them,
- * element wise. A Map function always produces a single result element for 
each input element.
- * Typical applications are parsing elements, converting data types, or 
projecting out fields.
- * Operations that produce multiple result elements from a single input 
element can be implemented
- * using the {@link FlatMapFunction}.
- * <p>
- * The basic syntax for using a MapFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<Y> result = input.map(new MyMapFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the MapFunction needs to be serializable, as defined in 
{@link java.io.Serializable}.
- * 
- * @param <IN> Type of the input elements.
- * @param <OUT> Type of the returned elements.
- */
-public abstract class MapFunction<IN, OUT> extends AbstractFunction implements 
GenericMap<IN, OUT> {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * The core method of the MapFunction. Takes an element from the input 
data set and transforms
-        * it into another element.
-        * 
-        * @param value The input value.
-        * @return The value produced by the map function from the input value.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public abstract OUT map(IN value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java
deleted file mode 100644
index aea6bf8..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/ReduceFunction.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.functions;
-
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericReduce;
-
-/**
- * The abstract base class for Reduce functions. Reduce functions combine 
groups of elements to
- * a single value, by taking always two elements and combining them into one. 
Reduce functions
- * may be used on entire data sets, or on grouped data sets. In the latter 
case, each group is reduced
- * individually.
- * <p>
- * For a reduce functions that work on an entire group at the same time (such 
as the 
- * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}, called via
- * {@link org.apache.flink.api.java.DataSet#reduceGroup(GroupReduceFunction)}. 
In the general case,
- * ReduceFunctions are considered faster, because they allow the system to use 
hash-based
- * execution strategies.
- * <p>
- * The basic syntax for using a grouped ReduceFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.groupBy(<key-definition>).reduce(new 
MyReduceFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the ReduceFunction needs to be serializable, as defined 
in {@link java.io.Serializable}.
- * 
- * @param <T> Type of the elements that this function processes.
- */
-public abstract class ReduceFunction<T> extends AbstractFunction implements 
GenericReduce<T> {
-       
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * The core method of the ReduceFunction, combining two values into one 
value of the same type.
-        * The reduce function is consecutively applied to all values of a 
group until only a single value remains.
-        *
-        * @param value1 The first value to combine.
-        * @param value2 The second value to combine.
-        * @return The combined value of both input values.
-        *
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       public abstract T reduce(T value1, T value2) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
new file mode 100644
index 0000000..8aaaf86
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The abstract base class for CoGroup functions. CoGroup functions combine 
two data sets by first grouping each data set
+ * after a key and then "joining" the groups by calling this function with the 
two sets for each key. 
+ * If a key is present in only one of the two inputs, it may be that one of 
the groups is empty.
+ * <p>
+ * The basic syntax for using CoGoup on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * 
set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyCoGroupFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * The keys can be defined through tuple field positions or key extractors.
+ * See {@link org.apache.flink.api.java.operators.Keys} for details.
+ * <p>
+ * Some keys may only be contained in one of the two original data sets. In 
that case, the CoGroup function is invoked
+ * with in empty input for the side of the data set that did not contain 
elements with that specific key.
+ * <p>
+ * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichCoGroupFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+       
+       
+       /**
+        * The core method of the CoGroupFunction. This method is called for 
each pair of groups that have the same
+        * key. The elements of the groups are returned by the respective 
iterators.
+        * 
+        * It is possible that one of the two groups is empty, in which case 
the respective iterator has no elements.
+        * 
+        * @param first The group from the first input.
+        * @param second The group from the second input.
+        * @param out The collector through which to return the result elements.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, 
Collector<OUT> out) throws Exception;
+
+}

Reply via email to