http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 5ca1068..a07a157 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -22,8 +22,10 @@ import java.security.InvalidParameterException;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
@@ -32,19 +34,22 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingJoinOperator;
 import 
org.apache.flink.api.java.operators.translation.TupleKeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.TypeInformation;
 
 //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
 import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.util.Collector;
 //CHECKSTYLE.ON: AvoidStarImport
 
 /**
@@ -147,12 +152,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
         * @param <I2> The type of the second input DataSet of the Join 
transformation.
         * @param <OUT> The type of the result of the Join transformation.
         * 
-        * @see JoinFunction
+        * @see org.apache.flink.api.java.functions.RichFlatJoinFunction
         * @see DataSet
         */
        public static class EquiJoin<I1, I2, OUT> extends JoinOperator<I1, I2, 
OUT> {
                
-               private final JoinFunction<I1, I2, OUT> function;
+               private final FlatJoinFunction<I1, I2, OUT> function;
                
                @SuppressWarnings("unused")
                private boolean preserve1;
@@ -160,7 +165,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                private boolean preserve2;
                
                protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2, 
-                               Keys<I1> keys1, Keys<I2> keys2, 
JoinFunction<I1, I2, OUT> function,
+                               Keys<I1> keys1, Keys<I2> keys2, 
FlatJoinFunction<I1, I2, OUT> function,
                                TypeInformation<OUT> returnType, JoinHint hint)
                {
                        super(input1, input2, keys1, keys2, returnType, hint);
@@ -171,14 +176,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                        
                        this.function = function;
 
-                       if (!(function instanceof ProjectJoinFunction)) {
+                       if (!(function instanceof ProjectFlatJoinFunction)) {
                                
extractSemanticAnnotationsFromUdf(function.getClass());
                        } else {
-                               
generateProjectionProperties(((ProjectJoinFunction<?, ?, ?>) function));
+                               
generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) function));
                        }
                }
 
-               public void generateProjectionProperties(ProjectJoinFunction<?, 
?, ?> pjf) {
+               protected EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+                               Keys<I1> keys1, Keys<I2> keys2, 
FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> 
function,
+                               TypeInformation<OUT> returnType, JoinHint hint)
+               {
+                       super(input1, input2, keys1, keys2, returnType, hint);
+
+                       if (function == null) {
+                               throw new NullPointerException();
+                       }
+
+                       this.function = generatedFunction;
+
+                       if (!(generatedFunction instanceof 
ProjectFlatJoinFunction)) {
+                               
extractSemanticAnnotationsFromUdf(function.getClass());
+                       } else {
+                               
generateProjectionProperties(((ProjectFlatJoinFunction<?, ?, ?>) 
generatedFunction));
+                       }
+               }
+
+               public void 
generateProjectionProperties(ProjectFlatJoinFunction<?, ?, ?> pjf) {
                        DualInputSemanticProperties props = 
SemanticPropUtil.createProjectionPropertiesDual(pjf.getFields(), 
pjf.getIsFromFirst());
                        setSemanticProperties(props);
                }
@@ -238,8 +262,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                                int[] logicalKeyPositions1 = 
super.keys1.computeLogicalKeyPositions();
                                int[] logicalKeyPositions2 = 
super.keys2.computeLogicalKeyPositions();
                                
-                               JoinOperatorBase<I1, I2, OUT, GenericJoiner<I1, 
I2, OUT>> po =
-                                               new JoinOperatorBase<I1, I2, 
OUT, GenericJoiner<I1, I2, OUT>>(function,
+                               JoinOperatorBase<I1, I2, OUT, 
FlatJoinFunction<I1, I2, OUT>> po =
+                                               new JoinOperatorBase<I1, I2, 
OUT, FlatJoinFunction<I1, I2, OUT>>(function,
                                                                new 
BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), 
getResultType()),
                                                                
logicalKeyPositions1, logicalKeyPositions2,
                                                                name);
@@ -298,7 +322,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                
                private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, 
I2, OUT, K> translateSelectorFunctionJoin(
                                Keys.SelectorFunctionKeys<I1, ?> rawKeys1, 
Keys.SelectorFunctionKeys<I2, ?> rawKeys2, 
-                               JoinFunction<I1, I2, OUT> function, 
+                               FlatJoinFunction<I1, I2, OUT> function,
                                TypeInformation<I1> inputType1, 
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
                                Operator<I1> input1, Operator<I2> input2)
                {
@@ -313,10 +337,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                        final KeyExtractingMapper<I1, K> extractor1 = new 
KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
                        final KeyExtractingMapper<I2, K> extractor2 = new 
KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
 
-                       final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, 
Tuple2<K, I1>>> keyMapper1 =
-                                       new MapOperatorBase<I1, Tuple2<K, I1>, 
GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-                       final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, 
Tuple2<K, I2>>> keyMapper2 =
-                                       new MapOperatorBase<I2, Tuple2<K, I2>, 
GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+                       final MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+                                       new MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+                       final MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+                                       new MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
                        final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = 
new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, keys2, name, 
outputType, typeInfoWithKey1, typeInfoWithKey2);
                        
                        join.setFirstInput(keyMapper1);
@@ -333,7 +357,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                
                private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, 
I2, OUT, K> translateSelectorFunctionJoinRight(
                                int[] logicalKeyPositions1, 
Keys.SelectorFunctionKeys<I2, ?> rawKeys2, 
-                               JoinFunction<I1, I2, OUT> function, 
+                               FlatJoinFunction<I1, I2, OUT> function,
                                TypeInformation<I1> inputType1, 
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
                                Operator<I1> input1, Operator<I2> input2)
                {
@@ -350,10 +374,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                        final TupleKeyExtractingMapper<I1, K> extractor1 = new 
TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]);
                        final KeyExtractingMapper<I2, K> extractor2 = new 
KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
 
-                       final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, 
Tuple2<K, I1>>> keyMapper1 =
-                                       new MapOperatorBase<I1, Tuple2<K, I1>, 
GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-                       final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, 
Tuple2<K, I2>>> keyMapper2 =
-                                       new MapOperatorBase<I2, Tuple2<K, I2>, 
GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+                       final MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+                                       new MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+                       final MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+                                       new MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
                        
                        final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = 
new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, logicalKeyPositions1, 
keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
                        
@@ -371,7 +395,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                
                private static <I1, I2, K, OUT> PlanUnwrappingJoinOperator<I1, 
I2, OUT, K> translateSelectorFunctionJoinLeft(
                                Keys.SelectorFunctionKeys<I1, ?> rawKeys1, 
int[] logicalKeyPositions2,
-                               JoinFunction<I1, I2, OUT> function, 
+                               FlatJoinFunction<I1, I2, OUT> function,
                                TypeInformation<I1> inputType1, 
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
                                Operator<I1> input1, Operator<I2> input2)
                {
@@ -388,10 +412,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                        final KeyExtractingMapper<I1, K> extractor1 = new 
KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
                        final TupleKeyExtractingMapper<I2, K> extractor2 = new 
TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]);
 
-                       final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, 
Tuple2<K, I1>>> keyMapper1 =
-                                       new MapOperatorBase<I1, Tuple2<K, I1>, 
GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-                       final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, 
Tuple2<K, I2>>> keyMapper2 =
-                                       new MapOperatorBase<I2, Tuple2<K, I2>, 
GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+                       final MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+                                       new MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+                       final MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+                                       new MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
                        
                        final PlanUnwrappingJoinOperator<I1, I2, OUT, K> join = 
new PlanUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, 
logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
                        
@@ -424,28 +448,73 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                                Keys<I1> keys1, Keys<I2> keys2, JoinHint hint)
                {
                        super(input1, input2, keys1, keys2, 
-                               (JoinFunction<I1, I2, Tuple2<I1, I2>>) new 
DefaultJoinFunction<I1, I2>(),
+                               (RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) 
new DefaultFlatJoinFunction<I1, I2>(),
                                new TupleTypeInfo<Tuple2<I1, 
I2>>(input1.getType(), input2.getType()), hint);
                }
                
                /**
-                * Finalizes a Join transformation by applying a {@link 
JoinFunction} to each pair of joined elements.<br/>
+                * Finalizes a Join transformation by applying a {@link 
org.apache.flink.api.java.functions.RichFlatJoinFunction} to each pair of 
joined elements.<br/>
                 * Each JoinFunction call returns exactly one element. 
                 * 
                 * @param function The JoinFunction that is called for each 
pair of joined elements.
                 * @return An EquiJoin that represents the joined result DataSet
                 * 
-                * @see JoinFunction
+                * @see org.apache.flink.api.java.functions.RichFlatJoinFunction
                 * @see 
org.apache.flink.api.java.operators.JoinOperator.EquiJoin
                 * @see DataSet
                 */
-               public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> 
function) {
+               public <R> EquiJoin<I1, I2, R> with(FlatJoinFunction<I1, I2, R> 
function) {
                        if (function == null) {
                                throw new NullPointerException("Join function 
must not be null.");
                        }
+                       if (FunctionUtils.isSerializedLambdaFunction(function)) 
{
+                               throw new 
UnsupportedLambdaExpressionException();
+                       }
                        TypeInformation<R> returnType = 
TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
                        return new EquiJoin<I1, I2, R>(getInput1(), 
getInput2(), getKeys1(), getKeys2(), function, returnType, getJoinHint());
                }
+
+               public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> 
function) {
+                       if (function == null) {
+                               throw new NullPointerException("Join function 
must not be null.");
+                       }
+                       if (FunctionUtils.isSerializedLambdaFunction(function)) 
{
+                               throw new 
UnsupportedLambdaExpressionException();
+                       }
+                       FlatJoinFunction generatedFunction = new 
WrappingFlatJoinFunction<I1, I2, R>(function);
+                       TypeInformation<R> returnType = 
TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
+                       return new EquiJoin<I1, I2, R>(getInput1(), 
getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, 
getJoinHint());
+               }
+
+               private static class WrappingFlatJoinFunction<IN1, IN2, OUT> 
extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements 
FlatJoinFunction<IN1, IN2, OUT> {
+
+                       private static final long serialVersionUID = 1L;
+
+                       private WrappingFlatJoinFunction(JoinFunction<IN1, IN2, 
OUT> wrappedFunction) {
+                               super(wrappedFunction);
+                       }
+
+                       @Override
+                       public void join(IN1 left, IN2 right, Collector<OUT> 
out) throws Exception {
+                               out.collect (this.wrappedFunction.join(left, 
right));
+                       }
+               }
+
+               /*
+               private static class GeneratedFlatJoinFunction<IN1, IN2, OUT> 
extends FlatJoinFunction<IN1, IN2, OUT> {
+
+                       private Joinable<IN1,IN2,OUT> function;
+
+                       private GeneratedFlatJoinFunction(Joinable<IN1, IN2, 
OUT> function) {
+                               this.function = function;
+                       }
+
+                       @Override
+                       public void join(IN1 first, IN2 second, Collector<OUT> 
out) throws Exception {
+                               out.collect(function.join(first, second));
+                       }
+               }
+               */
                
                /**
                 * Initiates a ProjectJoin transformation and projects the 
first join input<br/>
@@ -530,7 +599,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                
                protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, 
Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] 
isFromFirst, TupleTypeInfo<OUT> returnType) {
                        super(input1, input2, keys1, keys2, 
-                                       new ProjectJoinFunction<I1, I2, 
OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()), 
+                                       new ProjectFlatJoinFunction<I1, I2, 
OUT>(fields, isFromFirst, returnType.createSerializer().createInstance()),
                                        returnType, hint);
                }
 
@@ -821,20 +890,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
        //  default join functions
        // 
--------------------------------------------------------------------------------------------
        
-       public static final class DefaultJoinFunction<T1, T2> extends 
JoinFunction<T1, T2, Tuple2<T1, T2>> {
+       public static final class DefaultFlatJoinFunction<T1, T2> extends 
RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
 
                private static final long serialVersionUID = 1L;
                private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
 
                @Override
-               public Tuple2<T1, T2> join(T1 first, T2 second) throws 
Exception {
+               public void join(T1 first, T2 second, Collector<Tuple2<T1,T2>> 
out) throws Exception {
                        outTuple.f0 = first;
                        outTuple.f1 = second;
-                       return outTuple;
+                       out.collect(outTuple);
                }
        }
        
-       public static final class ProjectJoinFunction<T1, T2, R extends Tuple> 
extends JoinFunction<T1, T2, R> {
+       public static final class ProjectFlatJoinFunction<T1, T2, R extends 
Tuple> extends RichFlatJoinFunction<T1, T2, R> {
                
                private static final long serialVersionUID = 1L;
                
@@ -851,7 +920,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                 * @param isFromFirst List of flags indicating whether the 
field should be copied from the first (true) or the second (false) input.
                 * @param outTupleInstance An instance of an output tuple.
                 */
-               private ProjectJoinFunction(int[] fields, boolean[] 
isFromFirst, R outTupleInstance) {
+               private ProjectFlatJoinFunction(int[] fields, boolean[] 
isFromFirst, R outTupleInstance) {
                        
                        if(fields.length != isFromFirst.length) {
                                throw new IllegalArgumentException("Fields and 
isFromFirst arrays must have same length!"); 
@@ -869,7 +938,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                        return isFromFirst;
                }
 
-               public R join(T1 in1, T2 in2) {
+               public void join(T1 in1, T2 in2, Collector<R> out) {
                        for(int i=0; i<fields.length; i++) {
                                if(isFromFirst[i]) {
                                        if(fields[i] >= 0) {
@@ -885,27 +954,33 @@ public abstract class JoinOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1,
                                        }
                                }
                        }
-                       return outTuple;
+                       out.collect(outTuple);
                }
        }
        
-       public static final class LeftSemiJoinFunction<T1, T2> extends 
JoinFunction<T1, T2, T1> {
+       public static final class LeftSemiFlatJoinFunction<T1, T2> extends 
RichFlatJoinFunction<T1, T2, T1> {
 
                private static final long serialVersionUID = 1L;
 
                @Override
-               public T1 join(T1 left, T2 right) throws Exception {
-                       return left;
+               //public T1 join(T1 left, T2 right) throws Exception {
+               //      return left;
+               //}
+               public void join (T1 left, T2 right, Collector<T1> out) {
+                       out.collect(left);
                }
        }
        
-       public static final class RightSemiJoinFunction<T1, T2> extends 
JoinFunction<T1, T2, T2> {
+       public static final class RightSemiFlatJoinFunction<T1, T2> extends 
RichFlatJoinFunction<T1, T2, T2> {
 
                private static final long serialVersionUID = 1L;
 
                @Override
-               public T2 join(T1 left, T2 right) throws Exception {
-                       return right;
+               //public T2 join(T1 left, T2 right) throws Exception {
+               //      return right;
+               //}
+               public void join (T1 left, T2 right, Collector<T2> out) {
+                       out.collect(right);
                }
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 03c6037..eccdeec 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.flink.api.java.DataSet;
@@ -34,7 +33,7 @@ import org.apache.flink.api.java.DataSet;
  * @param <IN> The type of the data set consumed by the operator.
  * @param <OUT> The type of the data set created by the operator.
  * 
- * @see MapFunction
+ * @see org.apache.flink.api.common.functions.MapFunction
  */
 public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, 
MapOperator<IN, OUT>> {
        
@@ -42,6 +41,7 @@ public class MapOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT, MapOpe
        
        
        public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) {
+
                super(input, TypeExtractor.getMapReturnTypes(function, 
input.getType()));
                
                this.function = function;
@@ -49,11 +49,11 @@ public class MapOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT, MapOpe
        }
 
        @Override
-       protected 
org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, 
GenericMap<IN, OUT>> translateToDataFlow(Operator<IN> input) {
+       protected 
org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, 
MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
                
                String name = getName() != null ? getName() : 
function.getClass().getName();
                // create operator
-               MapOperatorBase<IN, OUT, GenericMap<IN, OUT>> po = new 
MapOperatorBase<IN, OUT, GenericMap<IN, OUT>>(function, new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+               MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new 
MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function, new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
                // set input
                po.setInput(input);
                // set dop

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 9e94670..dd5a3bd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import java.util.Arrays;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
@@ -51,7 +51,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
        }
 
        @Override
-       protected 
org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, 
GenericMap<IN,OUT>> translateToDataFlow(Operator<IN> input) {             
+       protected 
org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, 
MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
                String name = getName() != null ? getName() : "Projection " + 
Arrays.toString(fields);
                // create operator
                PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, 
OUT>(fields, name, getInputType(), getResultType());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
deleted file mode 100644
index d88d43d..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators;
-
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericMap;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.types.TypeInformation;
-
-import org.apache.flink.api.java.DataSet;
-
-/**
- * This operator represents the application of a "reduceGroup" function on a 
data set, and the
- * result data set produced by the function.
- * 
- * @param <IN> The type of the data set consumed by the operator.
- * @param <OUT> The type of the data set created by the operator.
- */
-public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, 
OUT, ReduceGroupOperator<IN, OUT>> {
-       
-       private final GroupReduceFunction<IN, OUT> function;
-       
-       private final Grouping<IN> grouper;
-       
-       private boolean combinable;
-       
-       
-       /**
-        * Constructor for a non-grouped reduce (all reduce).
-        * 
-        * @param input The input data set to the groupReduce function.
-        * @param function The user-defined GroupReduce function.
-        */
-       public ReduceGroupOperator(DataSet<IN> input, GroupReduceFunction<IN, 
OUT> function) {
-               super(input, TypeExtractor.getGroupReduceReturnTypes(function, 
input.getType()));
-               
-               this.function = function;
-               this.grouper = null;
-               checkCombinability();
-       }
-       
-       /**
-        * Constructor for a grouped reduce.
-        * 
-        * @param input The grouped input to be processed group-wise by the 
groupReduce function.
-        * @param function The user-defined GroupReduce function.
-        */
-       public ReduceGroupOperator(Grouping<IN> input, GroupReduceFunction<IN, 
OUT> function) {
-               super(input != null ? input.getDataSet() : null, 
TypeExtractor.getGroupReduceReturnTypes(function, 
input.getDataSet().getType()));
-               
-               this.function = function;
-               this.grouper = input;
-               checkCombinability();
-               
-               extractSemanticAnnotationsFromUdf(function.getClass());
-       }
-       
-       private void checkCombinability() {
-               if (function instanceof GenericCombine && 
function.getClass().getAnnotation(Combinable.class) != null) {
-                       this.combinable = true;
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Properties
-       // 
--------------------------------------------------------------------------------------------
-       
-       public boolean isCombinable() {
-               return combinable;
-       }
-       
-       public void setCombinable(boolean combinable) {
-               // sanity check that the function is a subclass of the combine 
interface
-               if (combinable && !(function instanceof GenericCombine)) {
-                       throw new IllegalArgumentException("The function does 
not implement the combine interface.");
-               }
-               
-               this.combinable = combinable;
-       }
-       
-       @Override
-       protected 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> 
translateToDataFlow(Operator<IN> input) {
-               
-               String name = getName() != null ? getName() : 
function.getClass().getName();
-               
-               // distinguish between grouped reduce and non-grouped reduce
-               if (grouper == null) {
-                       // non grouped reduce
-                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, 
OUT>> po =
-                                       new GroupReduceOperatorBase<IN, OUT, 
GenericGroupReduce<IN, OUT>>(function, operatorInfo, new int[0], name);
-
-                       po.setCombinable(combinable);
-                       // set input
-                       po.setInput(input);
-                       // the degree of parallelism for a non grouped reduce 
can only be 1
-                       po.setDegreeOfParallelism(1);
-                       return po;
-               }
-       
-               if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
-               
-                       @SuppressWarnings("unchecked")
-                       Keys.SelectorFunctionKeys<IN, ?> selectorKeys = 
(Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
-                       
-                       PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = 
translateSelectorFunctionReducer(
-                                                       selectorKeys, function, 
getInputType(), getResultType(), name, input, isCombinable());
-                       
-                       po.setDegreeOfParallelism(this.getParallelism());
-                       
-                       return po;
-               }
-               else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) {
-
-                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
-                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, 
OUT>> po =
-                                       new GroupReduceOperatorBase<IN, OUT, 
GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
-
-                       po.setCombinable(combinable);
-                       po.setInput(input);
-                       po.setDegreeOfParallelism(this.getParallelism());
-                       
-                       // set group order
-                       if (grouper instanceof SortedGrouping) {
-                               SortedGrouping<IN> sortedGrouper = 
(SortedGrouping<IN>) grouper;
-                                                               
-                               int[] sortKeyPositions = 
sortedGrouper.getGroupSortKeyPositions();
-                               Order[] sortOrders = 
sortedGrouper.getGroupSortOrders();
-                               
-                               Ordering o = new Ordering();
-                               for(int i=0; i < sortKeyPositions.length; i++) {
-                                       o.appendOrdering(sortKeyPositions[i], 
null, sortOrders[i]);
-                               }
-                               po.setGroupOrder(o);
-                       }
-                       
-                       return po;
-               }
-               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
-
-                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
-                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, 
OUT>> po =
-                                       new GroupReduceOperatorBase<IN, OUT, 
GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
-
-                       po.setCombinable(combinable);
-                       po.setInput(input);
-                       po.setDegreeOfParallelism(this.getParallelism());
-                       
-                       return po;
-               }
-               else {
-                       throw new UnsupportedOperationException("Unrecognized 
key type.");
-               }
-               
-       }
-       
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, 
K> translateSelectorFunctionReducer(
-                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
GroupReduceFunction<IN, OUT> function,
-                       TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input,
-                       boolean combinable)
-       {
-               @SuppressWarnings("unchecked")
-               final Keys.SelectorFunctionKeys<IN, K> keys = 
(Keys.SelectorFunctionKeys<IN, K>) rawKeys;
-               
-               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new 
TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
-               
-               KeyExtractingMapper<IN, K> extractor = new 
KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
-               
-               PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new 
PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, 
typeInfoWithKey, combinable);
-               
-               MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, 
IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, 
IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, 
typeInfoWithKey), "Key Extractor");
-
-               reducer.setInput(mapper);
-               mapper.setInput(input);
-               
-               // set the mapper's parallelism to the input parallelism to 
make sure it is chained
-               mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
-               
-               return reducer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 12e0f89..13a6c91 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.GenericMap;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.java.functions.ReduceFunction;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -40,7 +39,7 @@ import org.apache.flink.api.java.DataSet;
  * 
  * @param <IN> The type of the data set reduced by the operator.
  * 
- * @see ReduceFunction
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  */
 public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, 
ReduceOperator<IN>> {
        
@@ -83,8 +82,8 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                if (grouper == null) {
                        // non grouped reduce
                        UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
-                       ReduceOperatorBase<IN, GenericReduce<IN>> po =
-                                       new ReduceOperatorBase<IN, 
GenericReduce<IN>>(function, operatorInfo, new int[0], name);
+                       ReduceOperatorBase<IN, ReduceFunction<IN>> po =
+                                       new ReduceOperatorBase<IN, 
ReduceFunction<IN>>(function, operatorInfo, new int[0], name);
                        // set input
                        po.setInput(input);
                        
@@ -109,8 +108,8 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                        // reduce with field positions
                        int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
                        UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getInputType());
-                       ReduceOperatorBase<IN, GenericReduce<IN>> po =
-                                       new ReduceOperatorBase<IN, 
GenericReduce<IN>>(function, operatorInfo, logicalKeyPositions, name);
+                       ReduceOperatorBase<IN, ReduceFunction<IN>> po =
+                                       new ReduceOperatorBase<IN, 
ReduceFunction<IN>>(function, operatorInfo, logicalKeyPositions, name);
                        
                        // set input
                        po.setInput(input);
@@ -139,8 +138,8 @@ public class ReduceOperator<IN> extends 
SingleInputUdfOperator<IN, IN, ReduceOpe
                
                PlanUnwrappingReduceOperator<T, K> reducer = new 
PlanUnwrappingReduceOperator<T, K>(function, keys, name, inputType, 
typeInfoWithKey);
                
-               MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, T>>> 
keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, GenericMap<T, Tuple2<K, 
T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, 
typeInfoWithKey), "Key Extractor");
-               MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, T>> 
keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, GenericMap<Tuple2<K, T>, 
T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, 
T>(typeInfoWithKey, inputType), "Key Extractor");
+               MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> 
keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, 
Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, 
T>>(inputType, typeInfoWithKey), "Key Extractor");
+               MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> 
keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, 
T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, 
T>(typeInfoWithKey, inputType), "Key Extractor");
 
                keyExtractingMap.setInput(input);
                reducer.setInput(keyExtractingMap);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index fa2c1aa..dcdbed4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet;
 /**
  * The <tt>SingleInputUdfOperator</tt> is the base class of all unary 
operators that execute
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are 
naturally UDFs that
- * have one input (such as {@link 
org.apache.flink.api.java.functions.MapFunction} or
- * {@link org.apache.flink.api.java.functions.ReduceFunction}).
+ * have one input (such as {@link 
org.apache.flink.api.java.functions.RichMapFunction} or
+ * {@link org.apache.flink.api.java.functions.RichReduceFunction}).
  * <p>
  * This class encapsulates utilities for the UDFs, such as broadcast 
variables, parameterization
  * through configuration objects, and semantic properties.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 89c8bb2..97b2417 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
 
 import org.apache.flink.api.java.DataSet;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 
 /**
  * SortedGrouping is an intermediate step for a transformation on a grouped 
and sorted DataSet.<br/>
  * The following transformation can be applied on sorted groups:
  * <ul>
- *     <li>{@link SortedGrouping#reduceGroup(GroupReduceFunction)},</li>
+ *     <li>{@link 
SortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},</li>
  * </ul>
  * 
  * @param <T> The type of the elements of the sorted and grouped DataSet.
@@ -65,23 +67,27 @@ public class SortedGrouping<T> extends Grouping<T> {
 
        /**
         * Applies a GroupReduce transformation on a grouped and sorted {@link 
DataSet}.<br/>
-        * The transformation calls a {@link GroupReduceFunction} for each 
group of the DataSet.
+        * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of 
the DataSet.
         * A GroupReduceFunction can iterate over all elements of a group and 
emit any
         *   number of output elements including none.
         * 
         * @param reducer The GroupReduceFunction that is applied on each group 
of the DataSet.
         * @return A GroupReduceOperator that represents the reduced DataSet.
         * 
-        * @see GroupReduceFunction
-        * @see ReduceGroupOperator
+        * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+        * @see GroupReduceOperator
         * @see DataSet
         */
-       public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, 
R> reducer) {
+       public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, 
R> reducer) {
                if (reducer == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }
-               return new ReduceGroupOperator<T, R>(this, reducer);
+               if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+                       throw new UnsupportedLambdaExpressionException();
+               }
+               return new GroupReduceOperator<T, R>(this, reducer);
        }
+
        
        // 
--------------------------------------------------------------------------------------------
        //  Group Operations

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index a85ca3f..f347fef 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -35,8 +35,8 @@ import org.apache.flink.api.java.DataSet;
 /**
  * The <tt>TwoInputUdfOperator</tt> is the base class of all binary operators 
that execute
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are 
naturally UDFs that
- * have two inputs (such as {@link 
org.apache.flink.api.java.functions.JoinFunction} or 
- * {@link org.apache.flink.api.java.functions.CoGroupFunction}).
+ * have two inputs (such as {@link 
org.apache.flink.api.java.functions.RichJoinFunction} or
+ * {@link org.apache.flink.api.java.functions.RichCoGroupFunction}).
  * <p>
  * This class encapsulates utilities for the UDFs, such as broadcast 
variables, parameterization
  * through configuration objects, and semantic properties.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index 2040a27..bf33f4e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.java.DataSet;
 
 /**
  * This interface marks operators as operators that execute user-defined 
functions (UDFs), such as
- * {@link org.apache.flink.api.java.functions.MapFunction}, {@link 
org.apache.flink.api.java.functions.ReduceFunction},
- * or {@link org.apache.flink.api.java.functions.CoGroupFunction}.
+ * {@link org.apache.flink.api.java.functions.RichMapFunction}, {@link 
org.apache.flink.api.java.functions.RichReduceFunction},
+ * or {@link org.apache.flink.api.java.functions.RichCoGroupFunction}.
  * The UDF operators stand in contrast to operators that execute built-in 
operations, like aggregations.
  */
 public interface UdfOperator<O extends UdfOperator<O>> {
@@ -39,7 +39,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
        
        /**
         * Gets the configuration parameters that will be passed to the UDF's 
open method
-        * {@link 
org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)}. 
+        * {@link 
org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)}.
         * The configuration is set via the {@link 
#withParameters(Configuration)}
         * method.
         * 
@@ -69,7 +69,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
        
        /**
         * Sets the configuration parameters for the UDF. These are optional 
parameters that are passed
-        * to the UDF in the {@link 
org.apache.flink.api.common.functions.AbstractFunction#open(Configuration)} 
method.
+        * to the UDF in the {@link 
org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)} 
method.
         * 
         * @param parameters The configuration parameters for the UDF.
         * @return The operator itself, to allow chaining function calls.
@@ -83,7 +83,7 @@ public interface UdfOperator<O extends UdfOperator<O>> {
         * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getBroadcastVariable(String)}.
         * 
         * The runtime context itself is available in all UDFs via
-        * {@link 
org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}.
+        * {@link 
org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()}.
         * 
         * @param data The data set to be broadcasted.
         * @param name The name under which the broadcast data set retrieved.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 1d9d70d..9e71ba0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
 
 import org.apache.flink.api.java.DataSet;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 
 public class UnsortedGrouping<T> extends Grouping<T> {
 
@@ -90,14 +92,14 @@ public class UnsortedGrouping<T> extends Grouping<T> {
        
        /**
         * Applies a Reduce transformation on a grouped {@link DataSet}.<br/>
-        * For each group, the transformation consecutively calls a {@link 
ReduceFunction} 
+        * For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.java.functions.RichReduceFunction}
         *   until only a single element for each group remains. 
         * A ReduceFunction combines two elements into one new element of the 
same type.
         * 
         * @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
         * @return A ReduceOperator that represents the reduced DataSet.
         * 
-        * @see ReduceFunction
+        * @see org.apache.flink.api.java.functions.RichReduceFunction
         * @see ReduceOperator
         * @see DataSet
         */
@@ -110,24 +112,28 @@ public class UnsortedGrouping<T> extends Grouping<T> {
        
        /**
         * Applies a GroupReduce transformation on a grouped {@link 
DataSet}.<br/>
-        * The transformation calls a {@link GroupReduceFunction} for each 
group of the DataSet.
+        * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichGroupReduceFunction} for each group of 
the DataSet.
         * A GroupReduceFunction can iterate over all elements of a group and 
emit any
         *   number of output elements including none.
         * 
         * @param reducer The GroupReduceFunction that is applied on each group 
of the DataSet.
         * @return A GroupReduceOperator that represents the reduced DataSet.
         * 
-        * @see GroupReduceFunction
-        * @see ReduceGroupOperator
+        * @see org.apache.flink.api.java.functions.RichGroupReduceFunction
+        * @see GroupReduceOperator
         * @see DataSet
         */
-       public <R> ReduceGroupOperator<T, R> reduceGroup(GroupReduceFunction<T, 
R> reducer) {
+       public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, 
R> reducer) {
                if (reducer == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }
-               return new ReduceGroupOperator<T, R>(this, reducer);
+               if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+                       throw new UnsupportedLambdaExpressionException();
+               }
+               return new GroupReduceOperator<T, R>(this, reducer);
        }
 
+
        // 
--------------------------------------------------------------------------------------------
        //  Group Operations
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index aea99e3..c7f65f0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,11 +19,11 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 
-public final class KeyExtractingMapper<T, K> extends MapFunction<T, Tuple2<K, 
T>> {
+public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, 
Tuple2<K, T>> {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 52cbcd3..a6cd837 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 
-public final class KeyRemovingMapper<T, K> extends MapFunction<Tuple2<K, T>, 
T> {
+public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, 
T>, T> {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index 7fb9c0f..8ac2d01 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.java.functions.FilterFunction;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 
-public class PlanFilterOperator<T> extends FilterOperatorBase<T, 
GenericFlatMap<T, T>> {
+public class PlanFilterOperator<T> extends FilterOperatorBase<T, 
FlatMapFunction<T, T>> {
        
        public PlanFilterOperator(FilterFunction<T> udf, String name, 
TypeInformation<T> type) {
                super(new FlatMapFilter<T>(udf), new 
UnaryOperatorInformation<T, T>(type, type), name);
        }
 
        public static final class FlatMapFilter<T> extends 
WrappingFunction<FilterFunction<T>>
-               implements GenericFlatMap<T, T>
+               implements FlatMapFunction<T, T>
        {
 
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index 521814c..4de7311 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -18,22 +18,22 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.types.TypeInformation;
 
-public class PlanProjectOperator<T, R extends Tuple> extends 
MapOperatorBase<T, R, GenericMap<T, R>> {
+public class PlanProjectOperator<T, R extends Tuple> extends 
MapOperatorBase<T, R, MapFunction<T, R>> {
 
        public PlanProjectOperator(int[] fields, String name, 
TypeInformation<T> inType, TypeInformation<R> outType) {
                super(new MapProjector<T, R>(fields, 
outType.createSerializer().createInstance()), new UnaryOperatorInformation<T, 
R>(inType, outType), name);
        }
        
        public static final class MapProjector<T, R extends Tuple>
-               extends AbstractFunction
-               implements GenericMap<T, R>
+               extends AbstractRichFunction
+               implements MapFunction<T, R>
        {
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
index 20bd3b0..89290f0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingCoGroupOperator.java
@@ -20,20 +20,19 @@ package org.apache.flink.api.java.operators.translation;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.java.functions.CoGroupFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> 
-       extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, 
GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
+       extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, 
CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
 {
 
-       public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, 
+       public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
                        Keys.SelectorFunctionKeys<I1, K> key1, 
Keys.SelectorFunctionKeys<I2, K> key2, String name,
                        TypeInformation<OUT> type, TypeInformation<Tuple2<K, 
I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
        {
@@ -42,7 +41,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
                                key1.computeLogicalKeyPositions(), 
key2.computeLogicalKeyPositions(), name);
        }
        
-       public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, 
+       public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
                        int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, 
String name,
                        TypeInformation<OUT> type, TypeInformation<Tuple2<K, 
I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
        {
@@ -51,7 +50,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
                                new int[]{0}, 
key2.computeLogicalKeyPositions(), name);
        }
        
-       public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf, 
+       public PlanUnwrappingCoGroupOperator(CoGroupFunction<I1, I2, OUT> udf,
                        Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, 
String name,
                        TypeInformation<OUT> type, TypeInformation<Tuple2<K, 
I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
        {
@@ -63,7 +62,7 @@ public class PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>
        // 
--------------------------------------------------------------------------------------------
        
        public static final class TupleUnwrappingCoGrouper<I1, I2, OUT, K> 
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
-               implements GenericCoGrouper<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
+               implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
        {
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
index c121efe..73ea004 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingJoinOperator.java
@@ -18,20 +18,19 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.java.functions.JoinFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 
 public class PlanUnwrappingJoinOperator<I1, I2, OUT, K> 
-       extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, 
GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
+       extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, 
FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
 {
 
-       public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, 
+       public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
                        Keys.SelectorFunctionKeys<I1, K> key1, 
Keys.SelectorFunctionKeys<I2, K> key2, String name,
                        TypeInformation<OUT> type, TypeInformation<Tuple2<K, 
I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
        {
@@ -40,7 +39,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
                                key1.computeLogicalKeyPositions(), 
key2.computeLogicalKeyPositions(), name);
        }
        
-       public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, 
+       public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
                        int[] key1, Keys.SelectorFunctionKeys<I2, K> key2, 
String name,
                        TypeInformation<OUT> type, TypeInformation<Tuple2<K, 
I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
        {
@@ -49,7 +48,7 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
                                new int[]{0}, 
key2.computeLogicalKeyPositions(), name);
        }
        
-       public PlanUnwrappingJoinOperator(JoinFunction<I1, I2, OUT> udf, 
+       public PlanUnwrappingJoinOperator(FlatJoinFunction<I1, I2, OUT> udf,
                        Keys.SelectorFunctionKeys<I1, K> key1, int[] key2, 
String name,
                        TypeInformation<OUT> type, TypeInformation<Tuple2<K, 
I1>> typeInfoWithKey1, TypeInformation<Tuple2<K, I2>> typeInfoWithKey2)
        {
@@ -59,21 +58,26 @@ public class PlanUnwrappingJoinOperator<I1, I2, OUT, K>
        }
 
        public static final class TupleUnwrappingJoiner<I1, I2, OUT, K>
-               extends WrappingFunction<JoinFunction<I1, I2, OUT>>
-               implements GenericJoiner<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
+               extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+               implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
        {
 
                private static final long serialVersionUID = 1L;
                
-               private TupleUnwrappingJoiner(JoinFunction<I1, I2, OUT> 
wrapped) {
+               private TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> 
wrapped) {
                        super(wrapped);
                }
 
+               //@SuppressWarnings("unchecked")
+               //@Override
+               //public OUT join(Tuple2<K, I1> value1, Tuple2<K, I2> value2) 
throws Exception {
+               //      return wrappedFunction.join((I1)(value1.getField(1)), 
(I2)(value2.getField(1)));
+               //}
+
                @SuppressWarnings("unchecked")
                @Override
-               public void join(Tuple2<K, I1> value1, Tuple2<K, I2> value2,
-                               Collector<OUT> out) throws Exception {
-                       
out.collect(wrappedFunction.join((I1)(value1.getField(1)), 
(I2)(value2.getField(1))));
+               public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, 
Collector<OUT> collector) throws Exception {
+                       wrappedFunction.join ((I1)(value1.getField(1)), 
(I2)(value2.getField(1)), collector);
                }
                
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 5a59664..5e80455 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.java.operators.translation;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
@@ -35,12 +34,12 @@ import org.apache.flink.util.Collector;
  * A reduce operator that takes 2-tuples (key-value pairs), and applies the 
group reduce operation only
  * on the unwrapped values.
  */
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends 
GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GenericGroupReduce<Tuple2<K, 
IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends 
GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, 
IN>,OUT>> {
 
        public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> 
udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
                        TypeInformation<OUT> outType, TypeInformation<Tuple2<K, 
IN>> typeInfoWithKey, boolean combinable)
        {
-               super(combinable ? new 
TupleUnwrappingCombinableGroupReducer<IN, OUT, K>(udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+               super(combinable ? new 
TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction) 
udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
                                new UnaryOperatorInformation<Tuple2<K, IN>, 
OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
                
                super.setCombinable(combinable);
@@ -48,9 +47,9 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
        
        // 
--------------------------------------------------------------------------------------------
        
-       @Combinable
-       public static final class TupleUnwrappingCombinableGroupReducer<IN, 
OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-               implements GenericGroupReduce<Tuple2<K, IN>, OUT>, 
GenericCombine<Tuple2<K, IN>>
+       @RichGroupReduceFunction.Combinable
+       public static final class TupleUnwrappingFlatCombinableGroupReducer<IN, 
OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+               implements GroupReduceFunction<Tuple2<K, IN>, OUT>, 
FlatCombineFunction<Tuple2<K, IN>>
        {
 
                private static final long serialVersionUID = 1L;
@@ -58,7 +57,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
                private TupleUnwrappingIterator<IN, K> iter;
                private TupleWrappingCollector<IN, K> coll; 
                
-               private 
TupleUnwrappingCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
+               private 
TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
                        super(wrapped);
                        this.iter = new TupleUnwrappingIterator<IN, K>();
                        this.coll = new TupleWrappingCollector<IN, 
K>(this.iter);
@@ -85,7 +84,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
        }
        
        public static final class TupleUnwrappingNonCombinableGroupReducer<IN, 
OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
-               implements GenericGroupReduce<Tuple2<K, IN>, OUT>
+               implements GroupReduceFunction<Tuple2<K, IN>, OUT>
        {
        
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 66aa430..4da981c 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.java.functions.ReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.TypeInformation;
@@ -31,7 +30,7 @@ import org.apache.flink.types.TypeInformation;
  * A reduce operator that takes 2-tuples (key-value pairs), and applies the 
reduce operation only
  * on the unwrapped values.
  */
-public class PlanUnwrappingReduceOperator<T, K> extends 
ReduceOperatorBase<Tuple2<K, T>, GenericReduce<Tuple2<K, T>>> {
+public class PlanUnwrappingReduceOperator<T, K> extends 
ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
 
        public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, 
Keys.SelectorFunctionKeys<T, K> key, String name,
                        TypeInformation<T> type, TypeInformation<Tuple2<K, T>> 
typeInfoWithKey)
@@ -40,7 +39,7 @@ public class PlanUnwrappingReduceOperator<T, K> extends 
ReduceOperatorBase<Tuple
        }
 
        public static final class ReduceWrapper<T, K> extends 
WrappingFunction<ReduceFunction<T>>
-               implements GenericReduce<Tuple2<K, T>>
+               implements ReduceFunction<Tuple2<K, T>>
        {
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
index a915d1c..ecac775 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleKeyExtractingMapper.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 
 
-public final class TupleKeyExtractingMapper<T, K> extends MapFunction<T, 
Tuple2<K, T>> {
+public final class TupleKeyExtractingMapper<T, K> extends RichMapFunction<T, 
Tuple2<K, T>> {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index c98df6b..267d879 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -29,20 +29,21 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Value;
 
 
-public abstract class WrappingFunction<T extends AbstractFunction> extends 
AbstractFunction {
+public abstract class WrappingFunction<T extends Function> extends 
AbstractRichFunction {
        
        private static final long serialVersionUID = 1L;
 
        protected final T wrappedFunction;
-       
-       
+
        protected WrappingFunction(T wrappedFunction) {
                this.wrappedFunction = wrappedFunction;
        }
@@ -50,12 +51,12 @@ public abstract class WrappingFunction<T extends 
AbstractFunction> extends Abstr
        
        @Override
        public void open(Configuration parameters) throws Exception {
-               this.wrappedFunction.open(parameters);
+               FunctionUtils.openFunction(this.wrappedFunction, parameters);
        }
        
        @Override
        public void close() throws Exception {
-               this.wrappedFunction.close();
+               FunctionUtils.closeFunction(this.wrappedFunction);
        }
        
        @Override
@@ -63,13 +64,16 @@ public abstract class WrappingFunction<T extends 
AbstractFunction> extends Abstr
                super.setRuntimeContext(t);
                
                if (t instanceof IterationRuntimeContext) {
-                       this.wrappedFunction.setRuntimeContext(new 
WrappingIterationRuntimeContext(t));
+                       
FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new 
WrappingIterationRuntimeContext(t));
                }
                else{
-                       this.wrappedFunction.setRuntimeContext(new 
WrappingRuntimeContext(t));
+                       
FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, new 
WrappingRuntimeContext(t));
                }
        }
-       
+
+       public T getWrappedFunction () {
+               return this.wrappedFunction;
+       }
        
        
        private static class WrappingRuntimeContext implements RuntimeContext {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
index 633adab..fa0ca11 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
@@ -21,15 +21,14 @@ package org.apache.flink.api.java.record.functions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 /**
  * The CoGroupFunction is the base class for functions that are invoked by a 
{@link org.apache.flink.api.java.operators.CoGroupOperator}.
  */
-public abstract class CoGroupFunction extends AbstractFunction implements 
GenericCoGrouper<Record, Record, Record> {
+public abstract class CoGroupFunction extends AbstractRichFunction implements 
org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
index b2185a2..c4587fd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
@@ -19,15 +19,13 @@
 
 package org.apache.flink.api.java.record.functions;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
 
 /**
  * The CrossFunction is the base class for functions that are invoked by a 
{@link org.apache.flink.api.java.operators.CrossOperator}.
  */
-public abstract class CrossFunction extends AbstractFunction implements 
GenericCrosser<Record, Record, Record> {
+public abstract class CrossFunction extends AbstractRichFunction implements 
org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
        
        private static final long serialVersionUID = 1L;
        
@@ -35,14 +33,19 @@ public abstract class CrossFunction extends 
AbstractFunction implements GenericC
         * This method must be implemented to provide a user implementation of 
a cross.
         * It is called for each element of the Cartesian product of both input 
sets.
 
-        * @param record1 The record from the second input.
-        * @param record2 The record from the second input.
-        * @param out A collector that collects all output records.
+        * @param first The record from the second input.
+        * @param second The record from the second input.
+        * @return The result of the cross UDF
         * 
         * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
         *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
         *                   decide whether to retry the task execution.
         */
+
+       //@Override
+       //public abstract void cross(Record record1, Record record2, 
Collector<Record> out) throws Exception;
+
        @Override
-       public abstract void cross(Record record1, Record record2, 
Collector<Record> out) throws Exception;
+       public abstract Record cross(Record first, Record second) throws 
Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
index 0222c63..dce24a3 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.api.java.record.functions;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
  * The JoinFunction must implementation by functions of a {@link 
org.apache.flink.api.java.operators.JoinOperator}.
  * It resembles an equality join of both inputs on their key fields.
  */
-public abstract class JoinFunction extends AbstractFunction implements 
GenericJoiner<Record, Record, Record> {
+public abstract class JoinFunction extends AbstractRichFunction implements 
FlatJoinFunction<Record, Record, Record> {
        
        private static final long serialVersionUID = 1L;
        
@@ -38,7 +38,7 @@ public abstract class JoinFunction extends AbstractFunction 
implements GenericJo
         * 
         * @param value1 The record that comes from the first input.
         * @param value2 The record that comes from the second input.
-        * @param out A collector that collects all output pairs.
+        * @return The result of the join UDF as record
         * 
         * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
         *                   runtime catches an exception, it aborts the 
combine task and lets the fail-over logic

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
index 88b6282..99c945d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.java.record.functions;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
@@ -28,7 +28,7 @@ import org.apache.flink.util.Collector;
  * The MapFunction must be extended to provide a mapper implementation
  * By definition, the mapper is called for each individual input record.
  */
-public abstract class MapFunction extends AbstractFunction implements 
GenericCollectorMap<Record, Record> {
+public abstract class MapFunction extends AbstractRichFunction implements 
GenericCollectorMap<Record, Record> {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
index 4b1dbb3..073b11a 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
@@ -21,9 +21,9 @@ package org.apache.flink.api.java.record.functions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
@@ -31,7 +31,7 @@ import org.apache.flink.util.Collector;
  * The ReduceFunction must be extended to provide a reducer implementation, as 
invoked by a
  * {@link org.apache.flink.api.java.operators.ReduceOperator}.
  */
-public abstract class ReduceFunction extends AbstractFunction implements 
GenericGroupReduce<Record, Record>, GenericCombine<Record> {
+public abstract class ReduceFunction extends AbstractRichFunction implements 
GroupReduceFunction<Record, Record>, FlatCombineFunction<Record> {
        
        private static final long serialVersionUID = 1L;
        

Reply via email to