http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index a41874c..e8ee0bb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -31,12 +31,13 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.Validate;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericFlatMap;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.functions.InvalidTypesException;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -60,64 +61,75 @@ public class TypeExtractor {
        // 
--------------------------------------------------------------------------------------------
        
        @SuppressWarnings("unchecked")
-       public static <IN, OUT> TypeInformation<OUT> 
getMapReturnTypes(GenericMap<IN, OUT> mapInterface, TypeInformation<IN> inType) 
{
-               validateInputType(GenericMap.class, mapInterface.getClass(), 0, 
inType);
+       public static <IN, OUT> TypeInformation<OUT> 
getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> 
inType) {
+               validateInputType(MapFunction.class, mapInterface.getClass(), 
0, inType);
                if(mapInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
mapInterface).getProducedType();
                }
-               return new 
TypeExtractor().privateCreateTypeInfo(GenericMap.class, 
mapInterface.getClass(), 1, inType, null);
+               return new 
TypeExtractor().privateCreateTypeInfo(MapFunction.class, 
mapInterface.getClass(), 1, inType, null);
        }
        
        @SuppressWarnings("unchecked")
-       public static <IN, OUT> TypeInformation<OUT> 
getFlatMapReturnTypes(GenericFlatMap<IN, OUT> flatMapInterface, 
TypeInformation<IN> inType) {
-               validateInputType(GenericFlatMap.class, 
flatMapInterface.getClass(), 0, inType);
+       public static <IN, OUT> TypeInformation<OUT> 
getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, 
TypeInformation<IN> inType) {
+               validateInputType(FlatMapFunction.class, 
flatMapInterface.getClass(), 0, inType);
                if(flatMapInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
flatMapInterface).getProducedType();
                }
-               return new 
TypeExtractor().privateCreateTypeInfo(GenericFlatMap.class, 
flatMapInterface.getClass(), 1, inType, null);
+               return new 
TypeExtractor().privateCreateTypeInfo(FlatMapFunction.class, 
flatMapInterface.getClass(), 1, inType, null);
        }
        
        @SuppressWarnings("unchecked")
-       public static <IN, OUT> TypeInformation<OUT> 
getGroupReduceReturnTypes(GenericGroupReduce<IN, OUT> groupReduceInterface,
+       public static <IN, OUT> TypeInformation<OUT> 
getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface,
                        TypeInformation<IN> inType) {
-               validateInputType(GenericGroupReduce.class, 
groupReduceInterface.getClass(), 0, inType);
+               validateInputType(GroupReduceFunction.class, 
groupReduceInterface.getClass(), 0, inType);
                if(groupReduceInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
groupReduceInterface).getProducedType();
                }
-               return new 
TypeExtractor().privateCreateTypeInfo(GenericGroupReduce.class, 
groupReduceInterface.getClass(), 1, inType, null);
+               return new 
TypeExtractor().privateCreateTypeInfo(GroupReduceFunction.class, 
groupReduceInterface.getClass(), 1, inType, null);
        }
        
        @SuppressWarnings("unchecked")
-       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getJoinReturnTypes(GenericJoiner<IN1, IN2, OUT> joinInterface,
+       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
-               validateInputType(GenericJoiner.class, 
joinInterface.getClass(), 0, in1Type);
-               validateInputType(GenericJoiner.class, 
joinInterface.getClass(), 1, in2Type);
+               validateInputType(FlatJoinFunction.class, 
joinInterface.getClass(), 0, in1Type);
+               validateInputType(FlatJoinFunction.class, 
joinInterface.getClass(), 1, in2Type);
                if(joinInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
joinInterface).getProducedType();
                }
-               return new 
TypeExtractor().privateCreateTypeInfo(GenericJoiner.class, 
joinInterface.getClass(), 2, in1Type, in2Type);
+               return new 
TypeExtractor().privateCreateTypeInfo(FlatJoinFunction.class, 
joinInterface.getClass(), 2, in1Type, in2Type);
+       }
+
+       @SuppressWarnings("unchecked")
+       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
+                       TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
+               validateInputType(JoinFunction.class, joinInterface.getClass(), 
0, in1Type);
+               validateInputType(JoinFunction.class, joinInterface.getClass(), 
1, in2Type);
+               if(joinInterface instanceof ResultTypeQueryable) {
+                       return ((ResultTypeQueryable<OUT>) 
joinInterface).getProducedType();
+               }
+               return new 
TypeExtractor().privateCreateTypeInfo(JoinFunction.class, 
joinInterface.getClass(), 2, in1Type, in2Type);
        }
        
        @SuppressWarnings("unchecked")
-       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCoGroupReturnTypes(GenericCoGrouper<IN1, IN2, OUT> coGroupInterface,
+       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
-               validateInputType(GenericCoGrouper.class, 
coGroupInterface.getClass(), 0, in1Type);
-               validateInputType(GenericCoGrouper.class, 
coGroupInterface.getClass(), 1, in2Type);
+               validateInputType(CoGroupFunction.class, 
coGroupInterface.getClass(), 0, in1Type);
+               validateInputType(CoGroupFunction.class, 
coGroupInterface.getClass(), 1, in2Type);
                if(coGroupInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
coGroupInterface).getProducedType();
                }
-               return new 
TypeExtractor().privateCreateTypeInfo(GenericCoGrouper.class, 
coGroupInterface.getClass(), 2, in1Type, in2Type);
+               return new 
TypeExtractor().privateCreateTypeInfo(CoGroupFunction.class, 
coGroupInterface.getClass(), 2, in1Type, in2Type);
        }
        
        @SuppressWarnings("unchecked")
-       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCrossReturnTypes(GenericCrosser<IN1, IN2, OUT> crossInterface,
+       public static <IN1, IN2, OUT> TypeInformation<OUT> 
getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
                        TypeInformation<IN1> in1Type, TypeInformation<IN2> 
in2Type) {
-               validateInputType(GenericCrosser.class, 
crossInterface.getClass(), 0, in1Type);
-               validateInputType(GenericCrosser.class, 
crossInterface.getClass(), 1, in2Type);
+               validateInputType(CrossFunction.class, 
crossInterface.getClass(), 0, in1Type);
+               validateInputType(CrossFunction.class, 
crossInterface.getClass(), 1, in2Type);
                if(crossInterface instanceof ResultTypeQueryable) {
                        return ((ResultTypeQueryable<OUT>) 
crossInterface).getProducedType();
                }
-               return new 
TypeExtractor().privateCreateTypeInfo(GenericCrosser.class, 
crossInterface.getClass(), 2, in1Type, in2Type);
+               return new 
TypeExtractor().privateCreateTypeInfo(CrossFunction.class, 
crossInterface.getClass(), 2, in1Type, in2Type);
        }
        
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 8e0abcb..c786345 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -235,7 +235,7 @@ public final class TupleComparator<T extends Tuple> extends 
TypeComparator<T> im
                try {
                        for (; i < keyPositions.length; i++) {
                                int keyPos = keyPositions[i];
-                               int cmp = 
comparators[i].compare(first.getField(keyPos), second.getField(keyPos));
+                               int cmp = 
comparators[i].compare((T)first.getField(keyPos), (T)second.getField(keyPos));
                                if (cmp != 0) {
                                        return cmp;
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index 1159512..474b022 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -28,8 +28,6 @@ import 
org.apache.flink.api.common.operators.base.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
@@ -284,7 +282,7 @@ public class SemanticPropertiesTranslationTest {
        
        
        @ConstantFields("*")
-       public static class WildcardConstantMapper<T> extends MapFunction<T, T> 
{
+       public static class WildcardConstantMapper<T> extends 
RichMapFunction<T, T> {
 
                @Override
                public T map(T value)  {
@@ -293,7 +291,7 @@ public class SemanticPropertiesTranslationTest {
        }
        
        @ConstantFields("0->0;1->1;2->2")
-       public static class IndividualConstantMapper<X, Y, Z> extends 
MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
+       public static class IndividualConstantMapper<X, Y, Z> extends 
RichMapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> {
 
                @Override
                public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) {
@@ -302,7 +300,7 @@ public class SemanticPropertiesTranslationTest {
        }
        
        @ConstantFields("0")
-       public static class ZeroConstantMapper<T> extends MapFunction<T, T> {
+       public static class ZeroConstantMapper<T> extends RichMapFunction<T, T> 
{
 
                @Override
                public T map(T value)  {
@@ -312,7 +310,7 @@ public class SemanticPropertiesTranslationTest {
        
        @ConstantFieldsFirst("1 -> 0")
        @ConstantFieldsSecond("1 -> 1")
-       public static class ForwardingTupleJoin<A, B, C, D> extends 
JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
+       public static class ForwardingTupleJoin<A, B, C, D> extends 
RichJoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> {
 
                @Override
                public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> 
second) {
@@ -322,7 +320,7 @@ public class SemanticPropertiesTranslationTest {
        
        @ConstantFieldsFirst("0 -> 0")
        @ConstantFieldsSecond("0 -> 1")
-       public static class ForwardingBasicJoin<A, B> extends JoinFunction<A, 
B, Tuple2<A, B>> {
+       public static class ForwardingBasicJoin<A, B> extends 
RichJoinFunction<A, B, Tuple2<A, B>> {
 
                @Override
                public Tuple2<A, B> join(A first, B second) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index db795d9..155bbd1 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -35,9 +35,9 @@ import 
org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
@@ -128,8 +128,14 @@ public class DeltaIterationTranslationTest implements 
java.io.Serializable {
                        
                        assertEquals(IdentityMapper.class, 
worksetMapper.getUserCodeWrapper().getUserCodeClass());
                        assertEquals(NextWorksetMapper.class, 
nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
-                       assertEquals(SolutionWorksetJoin.class, 
solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
-                       
+                       if 
(solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof 
WrappingFunction) {
+                               WrappingFunction wf = (WrappingFunction) 
solutionSetJoin.getUserCodeWrapper().getUserCodeObject();
+                               assertEquals(SolutionWorksetJoin.class, 
wf.getWrappedFunction().getClass());
+                       }
+                       else {
+                               assertEquals(SolutionWorksetJoin.class, 
solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
+                       }
+
                        assertEquals(BEFORE_NEXT_WORKSET_MAP, 
nextWorksetMapper.getName());
                        
                        assertEquals(AGGREGATOR_NAME, 
iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
@@ -215,21 +221,21 @@ public class DeltaIterationTranslationTest implements 
java.io.Serializable {
        
        // 
--------------------------------------------------------------------------------------------
        
-       public static class SolutionWorksetJoin extends 
JoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, 
Tuple3<Double, Long, String>> {
+       public static class SolutionWorksetJoin extends 
RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, 
Tuple3<Double, Long, String>> {
                @Override
                public Tuple3<Double, Long, String> join(Tuple2<Double, String> 
first, Tuple3<Double, Long, String> second){
                        return null;
                }
        }
        
-       public static class NextWorksetMapper extends 
MapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+       public static class NextWorksetMapper extends 
RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
                @Override
                public Tuple2<Double, String> map(Tuple3<Double, Long, String> 
value) {
                        return null;
                }
        }
        
-       public static class IdentityMapper<T> extends MapFunction<T, T> {
+       public static class IdentityMapper<T> extends RichMapFunction<T, T> {
 
                @Override
                public T map(T value) throws Exception {
@@ -237,7 +243,7 @@ public class DeltaIterationTranslationTest implements 
java.io.Serializable {
                }
        }
        
-       public static class SolutionWorksetCoGroup1 extends 
CoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, 
Tuple3<Double, Long, String>> {
+       public static class SolutionWorksetCoGroup1 extends 
RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, 
Tuple3<Double, Long, String>> {
 
                @Override
                public void coGroup(Iterator<Tuple2<Double, String>> first, 
Iterator<Tuple3<Double, Long, String>> second,
@@ -245,7 +251,7 @@ public class DeltaIterationTranslationTest implements 
java.io.Serializable {
                }
        }
        
-       public static class SolutionWorksetCoGroup2 extends 
CoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, 
Tuple3<Double, Long, String>> {
+       public static class SolutionWorksetCoGroup2 extends 
RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, 
Tuple3<Double, Long, String>> {
 
                @Override
                public void coGroup(Iterator<Tuple3<Double, Long, String>> 
second, Iterator<Tuple2<Double, String>> first,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 9f6a6d8..8e457ce 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -27,9 +27,7 @@ import 
org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -53,7 +51,7 @@ public class ReduceTranslationTests implements 
java.io.Serializable {
                        
                        DataSet<Tuple3<Double, StringValue, LongValue>> 
initialData = getSourceDataSet(env);
                        
-                       initialData.reduce(new 
ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+                       initialData.reduce(new 
RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
                                public Tuple3<Double, StringValue, LongValue> 
reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, 
StringValue, LongValue> value2) {
                                        return value1;
                                }
@@ -94,7 +92,7 @@ public class ReduceTranslationTests implements 
java.io.Serializable {
                        
                        initialData
                                .groupBy(2)
-                               .reduce(new 
ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+                               .reduce(new 
RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
                                        public Tuple3<Double, StringValue, 
LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, 
StringValue, LongValue> value2) {
                                                return value1;
                                        }
@@ -141,7 +139,7 @@ public class ReduceTranslationTests implements 
java.io.Serializable {
                                                return value.f1;
                                        }
                                })
-                               .reduce(new 
ReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+                               .reduce(new 
RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
                                        public Tuple3<Double, StringValue, 
LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, 
StringValue, LongValue> value2) {
                                                return value1;
                                        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index b284052..c6ad73d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -23,16 +23,16 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.CrossFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichCrossFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.InvalidTypesException;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -68,7 +68,7 @@ public class TypeExtractorTest {
        @Test
        public void testBasicType() {
                // use getGroupReduceReturnTypes()
-               GroupReduceFunction<?, ?> function = new 
GroupReduceFunction<Boolean, Boolean>() {
+               RichGroupReduceFunction<?, ?> function = new 
RichGroupReduceFunction<Boolean, Boolean>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -107,7 +107,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testWritableType() {
-               MapFunction<?, ?> function = new MapFunction<MyWritable, 
MyWritable>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<MyWritable, MyWritable>() {
                        private static final long serialVersionUID = 1L;
                        
                        @Override
@@ -127,7 +127,7 @@ public class TypeExtractorTest {
        @Test
        public void testTupleWithBasicTypes() throws Exception {
                // use getMapReturnTypes()
-               MapFunction<?, ?> function = new MapFunction<Tuple9<Integer, 
Long, Double, Float, Boolean, String, Character, Short, Byte>, Tuple9<Integer, 
Long, Double, Float, Boolean, String, Character, Short, Byte>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Tuple9<Integer, Long, Double, Float, Boolean, String, 
Character, Short, Byte>, Tuple9<Integer, Long, Double, Float, Boolean, String, 
Character, Short, Byte>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -192,7 +192,7 @@ public class TypeExtractorTest {
        @Test
        public void testTupleWithTuples() {
                // use getFlatMapReturnTypes()
-               FlatMapFunction<?, ?> function = new 
FlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>, 
Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() {
+               RichFlatMapFunction<?, ?> function = new 
RichFlatMapFunction<Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, 
Long>>, Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -247,12 +247,12 @@ public class TypeExtractorTest {
        @Test
        public void testSubclassOfTuple() {
                // use getJoinReturnTypes()
-               JoinFunction<?, ?, ?> function = new JoinFunction<CustomTuple, 
String, CustomTuple>() {
+               RichFlatJoinFunction<?, ?, ?> function = new 
RichFlatJoinFunction<CustomTuple, String, CustomTuple>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
-                       public CustomTuple join(CustomTuple first, String 
second) throws Exception {
-                               return null;
+                       public void join(CustomTuple first, String second, 
Collector<CustomTuple> out) throws Exception {
+                               out.collect(null);
                        }                       
                };
 
@@ -295,7 +295,7 @@ public class TypeExtractorTest {
        @Test
        public void testCustomType() {
                // use getCrossReturnTypes()
-               CrossFunction<?, ?, ?> function = new CrossFunction<CustomType, 
Integer, CustomType>() {
+               RichCrossFunction<?, ?, ?> function = new 
RichCrossFunction<CustomType, Integer, CustomType>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -342,7 +342,7 @@ public class TypeExtractorTest {
        @Test
        public void testTupleWithCustomType() {
                // use getMapReturnTypes()
-               MapFunction<?, ?> function = new MapFunction<Tuple2<Long, 
CustomType>, Tuple2<Long, CustomType>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Tuple2<Long, CustomType>, Tuple2<Long, CustomType>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -412,7 +412,7 @@ public class TypeExtractorTest {
        @Test
        public void testTupleOfValues() {
                // use getMapReturnTypes()
-               MapFunction<?, ?> function = new 
MapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Tuple2<StringValue, IntValue>, Tuple2<StringValue, IntValue>>() 
{
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -451,7 +451,7 @@ public class TypeExtractorTest {
        @Test
        public void testGenericsNotInSuperclass() {
                // use getMapReturnTypes()
-               MapFunction<?, ?> function = new 
MapFunction<LongKeyValue<String>, LongKeyValue<String>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<LongKeyValue<String>, LongKeyValue<String>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -494,7 +494,7 @@ public class TypeExtractorTest {
        @Test
        public void testChainedGenericsNotInSuperclass() {
                // use TypeExtractor
-               MapFunction<?, ?> function = new 
MapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<ChainedTwo<Integer>, ChainedTwo<Integer>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -536,7 +536,7 @@ public class TypeExtractorTest {
        @Test
        public void testGenericsInDirectSuperclass() {
                // use TypeExtractor
-               MapFunction<?, ?> function = new MapFunction<ChainedThree, 
ChainedThree>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<ChainedThree, ChainedThree>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -562,7 +562,7 @@ public class TypeExtractorTest {
        @Test
        public void testGenericsNotInSuperclassWithNonGenericClassAtEnd() {
                // use TypeExtractor
-               MapFunction<?, ?> function = new MapFunction<ChainedFour, 
ChainedFour>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<ChainedFour, ChainedFour>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -587,7 +587,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testMissingTupleGenericsException() {
-               MapFunction<?, ?> function = new MapFunction<String, Tuple2>() {
+               RichMapFunction<?, ?> function = new RichMapFunction<String, 
Tuple2>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -607,7 +607,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testTupleSupertype() {
-               MapFunction<?, ?> function = new MapFunction<String, Tuple>() {
+               RichMapFunction<?, ?> function = new RichMapFunction<String, 
Tuple>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -635,7 +635,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testSameGenericVariable() {
-               MapFunction<?, ?> function = new 
MapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<SameTypeVariable<String>, SameTypeVariable<String>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -667,7 +667,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testNestedTupleGenerics() {
-               MapFunction<?, ?> function = new MapFunction<Nested<String, 
Integer>, Nested<String, Integer>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Nested<String, Integer>, Nested<String, Integer>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -706,7 +706,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testNestedTupleGenerics2() {
-               MapFunction<?, ?> function = new MapFunction<Nested2<Boolean>, 
Nested2<Boolean>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Nested2<Boolean>, Nested2<Boolean>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -746,7 +746,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Test
        public void testFunctionWithMissingGenerics() {
-               MapFunction function = new MapFunction() {
+               RichMapFunction function = new RichMapFunction() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -776,7 +776,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
        }
 
-       public class IdentityMapper<T> extends MapFunction<T, T> {
+       public class IdentityMapper<T> extends RichMapFunction<T, T> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -807,7 +807,7 @@ public class TypeExtractorTest {
                }
        }
 
-       public class IdentityMapper2<T> extends MapFunction<Tuple2<T, String>, 
T> {
+       public class IdentityMapper2<T> extends RichMapFunction<Tuple2<T, 
String>, T> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -843,7 +843,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
tti.getTypeAt(1));
        }
 
-       public class IdentityMapper3<T, V> extends MapFunction<T, V> {
+       public class IdentityMapper3<T, V> extends RichMapFunction<T, V> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -916,7 +916,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testFunctionWithNoGenericSuperclass() {
-               MapFunction<?, ?> function = new Mapper2();
+               RichMapFunction<?, ?> function = new Mapper2();
 
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeInfoParser.parse("String"));
 
@@ -924,7 +924,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
        }
 
-       public class OneAppender<T> extends MapFunction<T, Tuple2<T, Integer>> {
+       public class OneAppender<T> extends RichMapFunction<T, Tuple2<T, 
Integer>> {
                private static final long serialVersionUID = 1L;
 
                public Tuple2<T, Integer> map(T value) {
@@ -935,7 +935,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testFunctionDependingPartialOnInput() {
-               MapFunction<?, ?> function = new OneAppender<DoubleValue>() {
+               RichMapFunction<?, ?> function = new OneAppender<DoubleValue>() 
{
                        private static final long serialVersionUID = 1L;
                };
 
@@ -955,7 +955,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testFunctionDependingPartialOnInput2() {
-               MapFunction<DoubleValue, ?> function = new 
OneAppender<DoubleValue>();
+               RichMapFunction<DoubleValue, ?> function = new 
OneAppender<DoubleValue>();
 
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, new 
ValueTypeInfo<DoubleValue>(DoubleValue.class));
 
@@ -971,7 +971,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(Integer.class , 
tti.getTypeAt(1).getTypeClass());
        }
 
-       public class FieldDuplicator<T> extends MapFunction<T, Tuple2<T, T>> {
+       public class FieldDuplicator<T> extends RichMapFunction<T, Tuple2<T, 
T>> {
                private static final long serialVersionUID = 1L;
 
                public Tuple2<T, T> map(T value) {
@@ -981,7 +981,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testFunctionInputInOutputMultipleTimes() {
-               MapFunction<Float, ?> function = new FieldDuplicator<Float>();
+               RichMapFunction<Float, ?> function = new 
FieldDuplicator<Float>();
 
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, BasicTypeInfo.FLOAT_TYPE_INFO);
 
@@ -994,7 +994,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testFunctionInputInOutputMultipleTimes2() {
-               MapFunction<Tuple2<Float, Float>, ?> function = new 
FieldDuplicator<Tuple2<Float, Float>>();
+               RichMapFunction<Tuple2<Float, Float>, ?> function = new 
FieldDuplicator<Tuple2<Float, Float>>();
 
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, new TupleTypeInfo<Tuple2<Float, 
Float>>(
                                BasicTypeInfo.FLOAT_TYPE_INFO, 
BasicTypeInfo.FLOAT_TYPE_INFO));
@@ -1023,7 +1023,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testAbstractAndInterfaceTypesException() {
-               MapFunction<String, ?> function = new MapFunction<String, 
Testable>() {
+               RichMapFunction<String, ?> function = new 
RichMapFunction<String, Testable>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1039,7 +1039,7 @@ public class TypeExtractorTest {
                        // good
                }
 
-               MapFunction<String, ?> function2 = new MapFunction<String, 
AbstractClass>() {
+               RichMapFunction<String, ?> function2 = new 
RichMapFunction<String, AbstractClass>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1059,7 +1059,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testValueSupertypeException() {
-               MapFunction<?, ?> function = new MapFunction<StringValue, 
Value>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<StringValue, Value>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1080,7 +1080,7 @@ public class TypeExtractorTest {
        @Test
        public void testBasicArray() {
                // use getCoGroupReturnTypes()
-               CoGroupFunction<?, ?, ?> function = new 
CoGroupFunction<String[], String[], String[]>() {
+               RichCoGroupFunction<?, ?, ?> function = new 
RichCoGroupFunction<String[], String[], String[]>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1107,7 +1107,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testBasicArray2() {
-               MapFunction<Boolean[], ?> function = new 
IdentityMapper<Boolean[]>();
+               RichMapFunction<Boolean[], ?> function = new 
IdentityMapper<Boolean[]>();
 
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, 
BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO);
 
@@ -1122,7 +1122,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testCustomArray() {
-               MapFunction<?, ?> function = new 
MapFunction<CustomArrayObject[], CustomArrayObject[]>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<CustomArrayObject[], CustomArrayObject[]>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1140,7 +1140,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testTupleArray() {
-               MapFunction<?, ?> function = new MapFunction<Tuple2<String, 
String>[], Tuple2<String, String>[]>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Tuple2<String, String>[], Tuple2<String, String>[]>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1167,7 +1167,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testCustomArrayWithTypeVariable() {
-               MapFunction<CustomArrayObject2<Boolean>[], ?> function = new 
IdentityMapper<CustomArrayObject2<Boolean>[]>();
+               RichMapFunction<CustomArrayObject2<Boolean>[], ?> function = 
new IdentityMapper<CustomArrayObject2<Boolean>[]>();
 
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(function, (TypeInformation) 
TypeInfoParser.parse("Tuple1<Boolean>[]"));
 
@@ -1178,7 +1178,7 @@ public class TypeExtractorTest {
                Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
tti.getTypeAt(0));
        }
        
-       public class GenericArrayClass<T> extends MapFunction<T[], T[]> {
+       public class GenericArrayClass<T> extends RichMapFunction<T[], T[]> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -1207,7 +1207,7 @@ public class TypeExtractorTest {
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Test
        public void testParamertizedCustomObject() {
-               MapFunction<?, ?> function = new MapFunction<MyObject<String>, 
MyObject<String>>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<MyObject<String>, MyObject<String>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1242,7 +1242,7 @@ public class TypeExtractorTest {
        @Test
        public void testInputMismatchExceptions() {
                
-               MapFunction<?, ?> function = new MapFunction<Tuple2<String, 
String>, String>() {
+               RichMapFunction<?, ?> function = new 
RichMapFunction<Tuple2<String, String>, String>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1265,7 +1265,7 @@ public class TypeExtractorTest {
                        // right
                }
                
-               MapFunction<?, ?> function2 = new MapFunction<StringValue, 
String>() {
+               RichMapFunction<?, ?> function2 = new 
RichMapFunction<StringValue, String>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1281,7 +1281,7 @@ public class TypeExtractorTest {
                        // right
                }
                
-               MapFunction<?, ?> function3 = new 
MapFunction<Tuple1<Integer>[], String>() {
+               RichMapFunction<?, ?> function3 = new 
RichMapFunction<Tuple1<Integer>[], String>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1297,7 +1297,7 @@ public class TypeExtractorTest {
                        // right
                }
                
-               MapFunction<?, ?> function4 = new MapFunction<Writable, 
String>() {
+               RichMapFunction<?, ?> function4 = new RichMapFunction<Writable, 
String>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1314,7 +1314,7 @@ public class TypeExtractorTest {
                }
        }
        
-       public static class DummyFlatMapFunction<A,B,C,D> extends 
FlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
+       public static class DummyFlatMapFunction<A,B,C,D> extends 
RichFlatMapFunction<Tuple2<A,B>, Tuple2<C,D>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -1336,7 +1336,7 @@ public class TypeExtractorTest {
                }
        }
 
-       public static class MyQueryableMapper<A> extends MapFunction<String, A> 
implements ResultTypeQueryable<A> {
+       public static class MyQueryableMapper<A> extends 
RichMapFunction<String, A> implements ResultTypeQueryable<A> {
                private static final long serialVersionUID = 1L;
                
                @SuppressWarnings("unchecked")
@@ -1359,7 +1359,7 @@ public class TypeExtractorTest {
        
        @Test
        public void testTupleWithPrimitiveArray() {
-               MapFunction<Integer, 
Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], 
String[]>> function = new MapFunction<Integer, 
Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], 
String[]>>() {
+               RichMapFunction<Integer, 
Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], 
String[]>> function = new RichMapFunction<Integer, 
Tuple9<int[],double[],long[],byte[],char[],float[],short[], boolean[], 
String[]>>() {
                        private static final long serialVersionUID = 1L;
 
                        @Override
@@ -1382,8 +1382,8 @@ public class TypeExtractorTest {
        }
        
        @Test
-       public void testInterface() {
-               GenericMap<String, Boolean> mapInterface = new 
GenericMap<String, Boolean>() {
+       public void testFunction() {
+               RichMapFunction<String, Boolean> mapInterface = new 
RichMapFunction<String, Boolean>() {
                        
                        @Override
                        public void setRuntimeContext(RuntimeContext t) {
@@ -1392,7 +1392,6 @@ public class TypeExtractorTest {
                        
                        @Override
                        public void open(Configuration parameters) throws 
Exception {
-                               
                        }
                        
                        @Override
@@ -1414,4 +1413,17 @@ public class TypeExtractorTest {
                TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
                Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
        }
+
+       @Test
+       public void testInterface() {
+               MapFunction<String, Boolean> mapInterface = new 
MapFunction<String, Boolean>() {
+                       @Override
+                       public Boolean map(String record) throws Exception {
+                               return null;
+                       }
+               };
+
+               TypeInformation<?> ti = 
TypeExtractor.getMapReturnTypes(mapInterface, BasicTypeInfo.STRING_TYPE_INFO);
+               Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8-tests/pom.xml b/flink-java8-tests/pom.xml
new file mode 100644
index 0000000..2587776
--- /dev/null
+++ b/flink-java8-tests/pom.xml
@@ -0,0 +1,145 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>0.6-incubating-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-java8-tests</artifactId>
+       <name>flink-java8-tests</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>4.7</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+            <plugin>
+                <!-- just define the Java version to be used for compiling and 
plugins -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version><!--$NO-MVN-MAN-VER$-->
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                    <!-- High optimization, no debugging 
<compilerArgument>-g:none -O</compilerArgument> -->
+                </configuration>
+            </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <systemPropertyVariables>
+                                               <log.level>WARN</log.level>
+                                       </systemPropertyVariables>
+                                       <forkMode>once</forkMode>
+                                       <argLine>-Xmx1024m</argLine>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <configuration>
+                                       <systemPropertyVariables>
+                                               <log.level>WARN</log.level>
+                                       </systemPropertyVariables>
+                                       <forkMode>always</forkMode>
+                                       <threadCount>1</threadCount>
+                                       
<perCoreThreadCount>false</perCoreThreadCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+               
+               <pluginManagement>
+                       <plugins>
+                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
+                               <plugin>
+                                       <groupId>org.eclipse.m2e</groupId>
+                                       
<artifactId>lifecycle-mapping</artifactId>
+                                       <version>1.0.0</version>
+                                       <configuration>
+                                               <lifecycleMappingMetadata>
+                                                       <pluginExecutions>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>
+                                                                               
        org.apache.maven.plugins
+                                                                               
</groupId>
+                                                                               
<artifactId>
+                                                                               
        maven-assembly-plugin
+                                                                               
</artifactId>
+                                                                               
<versionRange>
+                                                                               
        [2.4,)
+                                                                               
</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>single</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore></ignore>
+                                                                       
</action>
+                                                               
</pluginExecution>
+                                                       </pluginExecutions>
+                                               </lifecycleMappingMetadata>
+                                       </configuration>
+                               </plugin>
+                       </plugins>
+               </pluginManagement>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
new file mode 100644
index 0000000..c417249
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CoGroupITCase.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class CoGroupITCase implements Serializable {
+
+       @Test
+       public void testCoGroupLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Integer, String>> left = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "hello"),
+                                       new Tuple2<Integer, String>(2, 
"what's"),
+                                       new Tuple2<Integer, String>(2, "up")
+                       );
+                       DataSet<Tuple2<Integer, String>> right = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "not"),
+                                       new Tuple2<Integer, String>(1, "much"),
+                                       new Tuple2<Integer, String>(2, "really")
+                       );
+                       DataSet<Tuple2<Integer,String>> joined = 
left.coGroup(right).where(0).equalTo(0)
+                                       .with((values1, values2, out) -> {
+                                               int sum = 0;
+                                               String conc = "";
+                                               while (values1.hasNext()) {
+                                                       sum += 
values1.next().f0;
+                                                       conc += 
values1.next().f1;
+                                               }
+                                               while (values2.hasNext()) {
+                                                       sum += 
values2.next().f0;
+                                                       conc += 
values2.next().f1;
+                                               }
+                                       });
+                       env.execute();
+
+
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
new file mode 100644
index 0000000..f8d217e
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/CrossITCase.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class CrossITCase implements Serializable {
+
+       @Test
+       public void testCrossLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Integer, String>> left = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "hello"),
+                                       new Tuple2<Integer, String>(2, 
"what's"),
+                                       new Tuple2<Integer, String>(2, "up")
+                       );
+                       DataSet<Tuple2<Integer, String>> right = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "not"),
+                                       new Tuple2<Integer, String>(1, "much"),
+                                       new Tuple2<Integer, String>(2, "really")
+                       );
+                       DataSet<Tuple2<Integer,String>> joined = 
left.cross(right)
+                                       .with((t,s) -> new Tuple2<Integer, 
String> (t.f0 + s.f0, t.f1 + " " + s.f1));
+
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
new file mode 100644
index 0000000..c775425
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FilterITCase.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class FilterITCase extends JavaProgramTestBase {
+
+
+       public static DataSet<Tuple3<Integer, Long, String>> 
get3TupleDataSet(ExecutionEnvironment env) {
+
+               List<Tuple3<Integer, Long, String>> data = new 
ArrayList<Tuple3<Integer, Long, String>>();
+               data.add(new Tuple3<Integer, Long, String>(1,1l,"Hi"));
+               data.add(new Tuple3<Integer, Long, String>(2,2l,"Hello"));
+               data.add(new Tuple3<Integer, Long, String>(3,2l,"Hello world"));
+               data.add(new Tuple3<Integer, Long, String>(4,3l,"Hello world, 
how are you?"));
+               data.add(new Tuple3<Integer, Long, String>(5,3l,"I am fine."));
+               data.add(new Tuple3<Integer, Long, String>(6,3l,"Luke 
Skywalker"));
+               data.add(new Tuple3<Integer, Long, String>(7,4l,"Comment#1"));
+               data.add(new Tuple3<Integer, Long, String>(8,4l,"Comment#2"));
+               data.add(new Tuple3<Integer, Long, String>(9,4l,"Comment#3"));
+               data.add(new Tuple3<Integer, Long, String>(10,4l,"Comment#4"));
+               data.add(new Tuple3<Integer, Long, String>(11,5l,"Comment#5"));
+               data.add(new Tuple3<Integer, Long, String>(12,5l,"Comment#6"));
+               data.add(new Tuple3<Integer, Long, String>(13,5l,"Comment#7"));
+               data.add(new Tuple3<Integer, Long, String>(14,5l,"Comment#8"));
+               data.add(new Tuple3<Integer, Long, String>(15,5l,"Comment#9"));
+               data.add(new Tuple3<Integer, Long, String>(16,6l,"Comment#10"));
+               data.add(new Tuple3<Integer, Long, String>(17,6l,"Comment#11"));
+               data.add(new Tuple3<Integer, Long, String>(18,6l,"Comment#12"));
+               data.add(new Tuple3<Integer, Long, String>(19,6l,"Comment#13"));
+               data.add(new Tuple3<Integer, Long, String>(20,6l,"Comment#14"));
+               data.add(new Tuple3<Integer, Long, String>(21,6l,"Comment#15"));
+
+               Collections.shuffle(data);
+
+               return env.fromCollection(data);
+       }
+
+       private static int NUM_PROGRAMS = 1;
+
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String resultPath;
+       private String expectedResult;
+
+       public FilterITCase(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = FilterProgs.runProgram(curProgId, resultPath);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+
+               return toParameterList(tConfigs);
+       }
+
+       private static class FilterProgs {
+
+               public static String runProgram(int progId, String resultPath) 
throws Exception {
+
+                       switch(progId) {
+                               case 1: {
+                                       /*
+                                        * Test lambda filter
+                                        * Functionality identical to 
org.apache.flink.test.javaApiOperators.FilterITCase test 3
+                                        */
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple3<Integer, Long, String>> 
ds = get3TupleDataSet(env);
+                                       DataSet<Tuple3<Integer, Long, String>> 
filterDs = ds.
+                                                       filter(value -> 
value.f2.contains("world"));
+                                       filterDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "3,2,Hello world\n" +
+                                                       "4,3,Hello world, how 
are you?\n";
+                               }
+                               default:
+                                       throw new 
IllegalArgumentException("Invalid program id");
+                       }
+
+               }
+
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
new file mode 100644
index 0000000..043b4e8
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatJoinITCase.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class FlatJoinITCase implements Serializable {
+
+       @Test
+       public void testFlatJoinLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Integer, String>> left = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "hello"),
+                                       new Tuple2<Integer, String>(2, 
"what's"),
+                                       new Tuple2<Integer, String>(2, "up")
+                       );
+                       DataSet<Tuple2<Integer, String>> right = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "not"),
+                                       new Tuple2<Integer, String>(1, "much"),
+                                       new Tuple2<Integer, String>(2, "really")
+                       );
+                       DataSet<Tuple2<Integer,String>> joined = 
left.join(right).where(0).equalTo(0)
+                                       .with((t,s,out) -> out.collect(new 
Tuple2<Integer,String>(t.f0, t.f1 + " " + s.f1)));
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
new file mode 100644
index 0000000..55f507c
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/FlatMapITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class FlatMapITCase implements Serializable {
+
+       @Test
+       public void testFlatMapLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<String> stringDs = env.fromElements("aa", "ab", 
"ac", "ad");
+                       DataSet<String> flatMappedDs = stringDs.flatMap((s, 
out) -> out.collect(s.replace("a", "b")));
+                       env.execute();
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
new file mode 100644
index 0000000..494aff6
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/GroupReduceITCase.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class GroupReduceITCase implements Serializable {
+
+       @Test
+       public void testAllGroupReduceLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<String> stringDs = env.fromElements("aa", "ab", 
"ac", "ad");
+                       DataSet<String> concatDs = 
stringDs.reduceGroup((values, out) -> {
+                               String conc = "";
+                               while (values.hasNext()) {
+                                       String s = values.next();
+                                       conc = conc.concat(s);
+                               }
+                               out.collect(conc);
+                       });
+                       env.execute();
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       @Test
+       public void testGroupReduceLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Integer,String>> stringDs = 
env.fromElements(
+                                       new Tuple2<Integer,String>(1, "aa"),
+                                       new Tuple2<Integer,String>(2, "ab"),
+                                       new Tuple2<Integer,String>(1, "ac"),
+                                       new Tuple2<Integer,String>(2, "ad")
+                       );
+                       DataSet<String> concatDs = stringDs
+                                       .groupBy(0)
+                                       .reduceGroup((values, out) -> {
+                                               String conc = "";
+                                               while (values.hasNext()) {
+                                                       String s = 
values.next().f1;
+                                                       conc = conc.concat(s);
+                                               }
+                                               out.collect(conc);
+                                       });
+                       env.execute();
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
new file mode 100644
index 0000000..3f4f696
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/JoinITCase.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class JoinITCase implements Serializable {
+
+       @Test
+       public void testJoinLambda() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Integer, String>> left = 
env.fromElements(
+                               new Tuple2<Integer, String>(1, "hello"),
+                               new Tuple2<Integer, String>(2, "what's"),
+                               new Tuple2<Integer, String>(2, "up")
+                       );
+                       DataSet<Tuple2<Integer, String>> right = 
env.fromElements(
+                                       new Tuple2<Integer, String>(1, "not"),
+                                       new Tuple2<Integer, String>(1, "much"),
+                                       new Tuple2<Integer, String>(2, "really")
+                       );
+                       DataSet<Tuple2<Integer,String>> joined = 
left.join(right).where(0).equalTo(0)
+                                       .with((t,s) -> new 
Tuple2<Integer,String>(t.f0, t.f1 + " " + t.f1));
+
+               } catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
new file mode 100644
index 0000000..3af360b
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+public class MapITCase implements Serializable{
+
+       @Test
+       public void TestMapLambda () {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<String> stringDs = env.fromElements("aa", "ab", 
"ac", "ad");
+                       DataSet<String> mappedDs = stringDs.map (s -> 
s.replace("a", "b"));
+                       env.execute();
+               }
+               catch (UnsupportedLambdaExpressionException e) {
+                       // Success
+                       return;
+               }
+               catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
new file mode 100644
index 0000000..ab27fe4
--- /dev/null
+++ 
b/flink-java8-tests/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/ReduceITCase.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators.lambdas;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ReduceITCase extends JavaProgramTestBase {
+
+       public static DataSet<Tuple5<Integer, Long, Integer, String, Long>> 
get5TupleDataSet(ExecutionEnvironment env) {
+
+               List<Tuple5<Integer, Long, Integer, String, Long>> data = new 
ArrayList<Tuple5<Integer, Long, Integer, String, Long>>();
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(1,1l,0,"Hallo",1l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(2,2l,1,"Hallo Welt",2l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(2,3l,2,"Hallo Welt wie",1l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(3,4l,3,"Hallo Welt wie gehts?",2l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(3,5l,4,"ABC",2l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(3,6l,5,"BCD",3l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(4,7l,6,"CDE",2l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(4,8l,7,"DEF",1l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(4,9l,8,"EFG",1l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(4,10l,9,"FGH",2l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(5,11l,10,"GHI",1l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(5,12l,11,"HIJ",3l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(5,13l,12,"IJK",3l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(5,14l,13,"JKL",2l));
+               data.add(new Tuple5<Integer, Long,  Integer, String, 
Long>(5,15l,14,"KLM",2l));
+
+               Collections.shuffle(data);
+
+               TupleTypeInfo<Tuple5<Integer, Long,  Integer, String, Long>> 
type = new
+                               TupleTypeInfo<Tuple5<Integer, Long,  Integer, 
String, Long>>(
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               BasicTypeInfo.INT_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.LONG_TYPE_INFO
+               );
+
+               return env.fromCollection(data, type);
+       }
+
+       private static int NUM_PROGRAMS = 1;
+
+       private int curProgId = config.getInteger("ProgramId", -1);
+       private String resultPath;
+       private String expectedResult;
+
+       public ReduceITCase(Configuration config) {
+               super(config);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               expectedResult = ReduceProgs.runProgram(curProgId, resultPath);
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
+
+               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
+
+               for(int i=1; i <= NUM_PROGRAMS; i++) {
+                       Configuration config = new Configuration();
+                       config.setInteger("ProgramId", i);
+                       tConfigs.add(config);
+               }
+
+               return toParameterList(tConfigs);
+       }
+
+       private static class ReduceProgs {
+
+               public static String runProgram(int progId, String resultPath) 
throws Exception {
+
+                       switch(progId) {
+                               case 1: {
+                                       /*
+                                        * Test reduce with lambda
+                                        * Functionality identical to 
org.apache.flink.test.javaApiOperators.ReduceITCase test 2
+                                        */
+
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> ds = get5TupleDataSet(env);
+                                       DataSet<Tuple5<Integer, Long, Integer, 
String, Long>> reduceDs = ds
+                                                       .groupBy(4, 0)
+                                                       .reduce((in1, in2) -> {
+                                                               Tuple5<Integer, 
Long, Integer, String, Long> out = new Tuple5<Integer, Long, Integer, String, 
Long>();
+                                                               
out.setFields(in1.f0, in1.f1 + in2.f1, 0, "P-)", in1.f4);
+                                                               return out;
+                                                       });
+
+                                       reduceDs.writeAsCsv(resultPath);
+                                       env.execute();
+
+                                       // return expected result
+                                       return "1,1,0,Hallo,1\n" +
+                                                       "2,3,2,Hallo Welt 
wie,1\n" +
+                                                       "2,2,1,Hallo Welt,2\n" +
+                                                       "3,9,0,P-),2\n" +
+                                                       "3,6,5,BCD,3\n" +
+                                                       "4,17,0,P-),1\n" +
+                                                       "4,17,0,P-),2\n" +
+                                                       "5,11,10,GHI,1\n" +
+                                                       "5,29,0,P-),2\n" +
+                                                       "5,25,0,P-),3\n";
+                               }
+                               default:
+                                       throw new 
IllegalArgumentException("Invalid program id");
+                       }
+
+               }
+
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 636c492..34cd232 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -54,7 +54,7 @@ import java.io.IOException;
 /**
  * The base class for all tasks able to participate in an iteration.
  */
-public abstract class AbstractIterativePactTask<S extends Function, OT> 
extends RegularPactTask<S, OT>
+public abstract class AbstractIterativePactTask<S extends RichFunction, OT> 
extends RegularPactTask<S, OT>
                implements Terminable
 {
        private static final Log log = 
LogFactory.getLog(AbstractIterativePactTask.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index d7f3b50..7a77cff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -75,7 +75,7 @@ import org.apache.flink.util.MutableObjectIterator;
  *        The type of the feed-back data set (bulk partial solution / 
workset). For bulk iterations, {@code Y} is the
  *        same as {@code X}
  */
-public class IterationHeadPactTask<X, Y, S extends Function, OT> extends 
AbstractIterativePactTask<S, OT> {
+public class IterationHeadPactTask<X, Y, S extends RichFunction, OT> extends 
AbstractIterativePactTask<S, OT> {
 
        private static final Log log = 
LogFactory.getLog(IterationHeadPactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index 25a6149..2a8325c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -41,7 +41,7 @@ import org.apache.flink.util.Collector;
  * a {@link BlockingBackChannel} for the workset -XOR- a {@link 
MutableHashTable} for the solution set. In this case
  * this task must be scheduled on the same instance as the head.
  */
-public class IterationIntermediatePactTask<S extends Function, OT> extends 
AbstractIterativePactTask<S, OT> {
+public class IterationIntermediatePactTask<S extends RichFunction, OT> extends 
AbstractIterativePactTask<S, OT> {
 
        private static final Log log = 
LogFactory.getLog(IterationIntermediatePactTask.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 570630f..942e2f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.iterative.task;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import 
org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
@@ -38,7 +38,7 @@ import org.apache.flink.util.Collector;
  * <p/>
  * If there is a separate solution set tail, the iteration head has to make 
sure to wait for it to finish.
  */
-public class IterationTailPactTask<S extends Function, OT> extends 
AbstractIterativePactTask<S, OT>
+public class IterationTailPactTask<S extends RichFunction, OT> extends 
AbstractIterativePactTask<S, OT>
                implements PactTaskContext<S, OT> {
 
        private static final Log log = 
LogFactory.getLog(IterationTailPactTask.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
index fe70171..d7d63af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends 
MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, 
IT2, OT>, OT> {
+public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends 
MatchDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, 
IT2, OT>, OT> {
 
        private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
        
@@ -110,7 +110,7 @@ public abstract class 
AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
        @Override
        public void run() throws Exception {
 
-               final GenericJoiner<IT1, IT2, OT> matchStub = 
this.taskContext.getStub();
+               final FlatJoinFunction<IT1, IT2, OT> matchStub = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                
                if (buildSideIndex == 0) {

Reply via email to