[FLINK-701] Refactor Java API to use SAM interfaces. Introduce RichFunction 
stubs for all UDFs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/22b24f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/22b24f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/22b24f20

Branch: refs/heads/release-0.6
Commit: 22b24f208de2501311fe504e767526690742e30e
Parents: 9d1c49f
Author: Kostas Tzoumas <[email protected]>
Authored: Fri Jul 18 15:37:19 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 31 17:47:35 2014 +0200

----------------------------------------------------------------------
 .../flink/api/avro/AvroOutputFormatTest.java    |   6 +-
 .../avro/testjar/AvroExternalJarProgram.java    |   8 +-
 .../mapred/example/WordCount.java               |   8 +-
 .../mapreduce/example/WordCount.java            |   8 +-
 .../spargel/java/VertexCentricIteration.java    |   8 +-
 .../examples/SpargelConnectedComponents.java    |   4 +-
 .../spargel/java/examples/SpargelPageRank.java  |   8 +-
 .../SpargelPageRankCountingVertices.java        |  14 +-
 .../SpargelConnectedComponentsITCase.java       |   4 +-
 .../apache/flink/client/testjar/WordCount.java  |   4 +-
 .../flink/compiler/util/NoOpFunction.java       |   4 +-
 .../compiler/BranchingPlansCompilerTest.java    |   5 +-
 .../CachedMatchStrategyCompilerTest.java        |   5 +-
 .../compiler/CoGroupSolutionSetFirstTest.java   |   9 +-
 .../apache/flink/compiler/CompilerTestBase.java |   8 +-
 .../compiler/FeedbackPropertiesMatchTest.java   |  10 +-
 .../compiler/GroupReduceCompilationTest.java    |  20 +-
 .../flink/compiler/IterationsCompilerTest.java  |  20 +-
 .../flink/compiler/ReduceCompilationTest.java   |  10 +-
 .../compiler/UnionPropertyPropagationTest.java  |   4 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |  18 +-
 .../testfunctions/DummyFlatJoinFunction.java    |  33 ++
 .../testfunctions/DummyJoinFunction.java        |  32 -
 .../testfunctions/IdentityGroupReducer.java     |   6 +-
 .../compiler/testfunctions/IdentityMapper.java  |   4 +-
 .../testfunctions/SelectOneReducer.java         |   4 +-
 .../testfunctions/Top1GroupReducer.java         |   6 +-
 .../flink/compiler/util/DummyCrossStub.java     |   9 +-
 .../api/common/functions/AbstractFunction.java  |  76 ---
 .../common/functions/AbstractRichFunction.java  |  76 +++
 .../api/common/functions/CoGroupFunction.java   |  41 ++
 .../api/common/functions/CombineFunction.java   |  31 +
 .../api/common/functions/CrossFunction.java     |  42 ++
 .../api/common/functions/FilterFunction.java    |  33 ++
 .../common/functions/FlatCombineFunction.java   |  33 ++
 .../api/common/functions/FlatJoinFunction.java  |  30 +
 .../api/common/functions/FlatMapFunction.java   |  45 ++
 .../flink/api/common/functions/Function.java    |  75 +--
 .../api/common/functions/GenericCoGrouper.java  |  40 --
 .../common/functions/GenericCollectorMap.java   |   2 +-
 .../api/common/functions/GenericCombine.java    |  32 -
 .../api/common/functions/GenericCrosser.java    |  41 --
 .../api/common/functions/GenericFilter.java     |  33 --
 .../api/common/functions/GenericFlatMap.java    |  42 --
 .../common/functions/GenericGroupReduce.java    |  44 --
 .../api/common/functions/GenericJoiner.java     |  28 -
 .../flink/api/common/functions/GenericMap.java  |  30 -
 .../api/common/functions/GenericReduce.java     |  26 -
 .../common/functions/GroupReduceFunction.java   |  45 ++
 .../api/common/functions/JoinFunction.java      |  28 +
 .../flink/api/common/functions/MapFunction.java |  38 ++
 .../api/common/functions/ReduceFunction.java    |  39 ++
 .../api/common/functions/RichFunction.java      |  97 ++++
 .../api/common/functions/RuntimeContext.java    |   2 +-
 .../common/functions/util/FunctionUtils.java    |  81 +++
 .../flink/api/common/operators/Union.java       |   6 +-
 .../operators/base/BulkIterationBase.java       |   8 +-
 .../operators/base/CoGroupOperatorBase.java     |   6 +-
 .../operators/base/CrossOperatorBase.java       |   6 +-
 .../operators/base/DeltaIterationBase.java      |   6 +-
 .../operators/base/FilterOperatorBase.java      |   6 +-
 .../operators/base/FlatMapOperatorBase.java     |   6 +-
 .../operators/base/GroupReduceOperatorBase.java |  17 +-
 .../common/operators/base/JoinOperatorBase.java |   6 +-
 .../common/operators/base/MapOperatorBase.java  |   4 +-
 .../operators/base/ReduceOperatorBase.java      |   6 +-
 .../api/common/operators/util/OperatorUtil.java |  18 +-
 .../common/operators/util/OperatorUtilTest.java |  21 +-
 flink-examples/flink-java-examples/pom.xml      |   1 -
 .../flink/example/java/clustering/KMeans.java   |  17 +-
 .../example/java/graph/ConnectedComponents.java |  24 +-
 .../example/java/graph/EnumTrianglesBasic.java  |  14 +-
 .../example/java/graph/EnumTrianglesOpt.java    |  28 +-
 .../flink/example/java/graph/PageRankBasic.java |  18 +-
 .../java/graph/TransitiveClosureNaive.java      |   4 +-
 .../flink/example/java/ml/LinearRegression.java |  15 +-
 .../relational/EmptyFieldsCountAccumulator.java |   4 +-
 .../java/relational/RelationalQuery.java        |   2 +-
 .../example/java/relational/TPCHQuery10.java    |   4 +-
 .../example/java/relational/TPCHQuery3.java     |   4 +-
 .../example/java/relational/WebLogAnalysis.java |  12 +-
 .../flink/example/java/wordcount/WordCount.java |   4 +-
 .../example/java/wordcount/WordCountPOJO.java   |   6 +-
 flink-examples/pom.xml                          |   1 -
 .../java/org/apache/flink/api/java/DataSet.java |  93 +--
 .../apache/flink/api/java/DeltaIteration.java   |   2 +-
 .../flink/api/java/ExecutionEnvironment.java    |   6 +-
 .../apache/flink/api/java/IterativeDataSet.java |   2 +-
 .../api/java/functions/CoGroupFunction.java     |  74 ---
 .../flink/api/java/functions/CrossFunction.java |  76 ---
 .../api/java/functions/FilterFunction.java      |  57 --
 .../api/java/functions/FlatMapFunction.java     |  59 --
 .../api/java/functions/FlatMapIterator.java     |   4 +-
 .../api/java/functions/FunctionAnnotation.java  |  16 +-
 .../api/java/functions/GroupReduceFunction.java | 114 ----
 .../api/java/functions/GroupReduceIterator.java |   2 +-
 .../flink/api/java/functions/JoinFunction.java  |  93 ---
 .../flink/api/java/functions/MapFunction.java   |  59 --
 .../api/java/functions/ReduceFunction.java      |  63 --
 .../api/java/functions/RichCoGroupFunction.java |  74 +++
 .../api/java/functions/RichCrossFunction.java   |  67 +++
 .../api/java/functions/RichFilterFunction.java  |  57 ++
 .../java/functions/RichFlatCombineFunction.java |  34 ++
 .../java/functions/RichFlatJoinFunction.java    |  75 +++
 .../api/java/functions/RichFlatMapFunction.java |  59 ++
 .../java/functions/RichGroupReduceFunction.java | 114 ++++
 .../api/java/functions/RichJoinFunction.java    |  31 +
 .../api/java/functions/RichMapFunction.java     |  59 ++
 .../api/java/functions/RichReduceFunction.java  |  63 ++
 .../UnsupportedLambdaExpressionException.java   |  30 +
 .../api/java/operators/AggregateOperator.java   |  20 +-
 .../api/java/operators/CoGroupOperator.java     |  52 +-
 .../flink/api/java/operators/CrossOperator.java |  18 +-
 .../api/java/operators/DistinctOperator.java    |  44 +-
 .../api/java/operators/FilterOperator.java      |   6 +-
 .../api/java/operators/FlatMapOperator.java     |   7 +-
 .../api/java/operators/GroupReduceOperator.java | 224 +++++++
 .../flink/api/java/operators/Grouping.java      |   4 +-
 .../flink/api/java/operators/JoinOperator.java  | 163 ++++--
 .../flink/api/java/operators/MapOperator.java   |  10 +-
 .../api/java/operators/ProjectOperator.java     |   4 +-
 .../api/java/operators/ReduceGroupOperator.java | 215 -------
 .../api/java/operators/ReduceOperator.java      |  19 +-
 .../java/operators/SingleInputUdfOperator.java  |   4 +-
 .../api/java/operators/SortedGrouping.java      |  20 +-
 .../api/java/operators/TwoInputUdfOperator.java |   4 +-
 .../flink/api/java/operators/UdfOperator.java   |  10 +-
 .../api/java/operators/UnsortedGrouping.java    |  24 +-
 .../translation/KeyExtractingMapper.java        |   4 +-
 .../translation/KeyRemovingMapper.java          |   4 +-
 .../translation/PlanFilterOperator.java         |   8 +-
 .../translation/PlanProjectOperator.java        |  10 +-
 .../PlanUnwrappingCoGroupOperator.java          |  13 +-
 .../translation/PlanUnwrappingJoinOperator.java |  28 +-
 .../PlanUnwrappingReduceGroupOperator.java      |  21 +-
 .../PlanUnwrappingReduceOperator.java           |   7 +-
 .../translation/TupleKeyExtractingMapper.java   |   4 +-
 .../operators/translation/WrappingFunction.java |  22 +-
 .../java/record/functions/CoGroupFunction.java  |   5 +-
 .../java/record/functions/CrossFunction.java    |  19 +-
 .../api/java/record/functions/JoinFunction.java |   8 +-
 .../api/java/record/functions/MapFunction.java  |   4 +-
 .../java/record/functions/ReduceFunction.java   |   8 +-
 .../flink/api/java/typeutils/TypeExtractor.java |  66 ++-
 .../java/typeutils/runtime/TupleComparator.java |   2 +-
 .../SemanticPropertiesTranslationTest.java      |  12 +-
 .../DeltaIterationTranslationTest.java          |  26 +-
 .../translation/ReduceTranslationTests.java     |  10 +-
 .../java/type/extractor/TypeExtractorTest.java  | 126 ++--
 flink-java8-tests/pom.xml                       | 145 +++++
 .../javaApiOperators/lambdas/CoGroupITCase.java |  70 +++
 .../javaApiOperators/lambdas/CrossITCase.java   |  59 ++
 .../javaApiOperators/lambdas/FilterITCase.java  | 142 +++++
 .../lambdas/FlatJoinITCase.java                 |  58 ++
 .../javaApiOperators/lambdas/FlatMapITCase.java |  46 ++
 .../lambdas/GroupReduceITCase.java              |  84 +++
 .../javaApiOperators/lambdas/JoinITCase.java    |  58 ++
 .../javaApiOperators/lambdas/MapITCase.java     |  48 ++
 .../javaApiOperators/lambdas/ReduceITCase.java  | 160 +++++
 .../task/AbstractIterativePactTask.java         |   4 +-
 .../iterative/task/IterationHeadPactTask.java   |   4 +-
 .../task/IterationIntermediatePactTask.java     |   4 +-
 .../iterative/task/IterationTailPactTask.java   |   4 +-
 .../AbstractCachedBuildSideMatchDriver.java     |   6 +-
 .../runtime/operators/AllGroupReduceDriver.java |  24 +-
 .../runtime/operators/AllReduceDriver.java      |  16 +-
 .../flink/runtime/operators/CoGroupDriver.java  |  14 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |  14 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |  14 +-
 .../flink/runtime/operators/CrossDriver.java    |  32 +-
 .../flink/runtime/operators/FlatMapDriver.java  |  16 +-
 .../operators/GroupReduceCombineDriver.java     |  14 +-
 .../runtime/operators/GroupReduceDriver.java    |  16 +-
 .../JoinWithSolutionSetFirstDriver.java         |  14 +-
 .../JoinWithSolutionSetSecondDriver.java        |  14 +-
 .../flink/runtime/operators/MapDriver.java      |  16 +-
 .../flink/runtime/operators/MatchDriver.java    |  16 +-
 .../flink/runtime/operators/NoOpDriver.java     |  10 +-
 .../runtime/operators/ReduceCombineDriver.java  |  16 +-
 .../flink/runtime/operators/ReduceDriver.java   |  16 +-
 .../runtime/operators/RegularPactTask.java      |  42 +-
 .../chaining/ChainedCollectorMapDriver.java     |   4 +-
 .../chaining/ChainedFlatMapDriver.java          |  17 +-
 .../operators/chaining/ChainedMapDriver.java    |  14 +-
 .../ChainedTerminationCriterionDriver.java      |   4 +-
 .../SynchronousChainedCombineDriver.java        |  13 +-
 .../hash/BuildFirstHashMatchIterator.java       |   4 +-
 .../hash/BuildSecondHashMatchIterator.java      |   4 +-
 .../sort/CombiningUnilateralSortMerger.java     |  19 +-
 .../operators/sort/MergeMatchIterator.java      |  14 +-
 .../operators/util/JoinTaskIterator.java        |   4 +-
 .../runtime/operators/CachedMatchTaskTest.java  |   7 +-
 .../operators/CoGroupTaskExternalITCase.java    |   9 +-
 .../runtime/operators/CoGroupTaskTest.java      |  11 +-
 .../operators/CombineTaskExternalITCase.java    |   6 +-
 .../runtime/operators/CombineTaskTest.java      |   6 +-
 .../operators/CrossTaskExternalITCase.java      |   6 +-
 .../flink/runtime/operators/CrossTaskTest.java  |  22 +-
 .../operators/MatchTaskExternalITCase.java      |   6 +-
 .../flink/runtime/operators/MatchTaskTest.java  |  14 +-
 .../operators/ReduceTaskExternalITCase.java     |   6 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   6 +-
 .../drivers/AllGroupReduceDriverTest.java       |  20 +-
 .../operators/drivers/AllReduceDriverTest.java  |  32 +-
 .../drivers/GroupReduceDriverTest.java          |  20 +-
 .../drivers/ReduceCombineDriverTest.java        |  32 +-
 .../operators/drivers/ReduceDriverTest.java     |  32 +-
 .../operators/hash/HashMatchIteratorITCase.java |  18 +-
 .../sort/SortMergeMatchIteratorITCase.java      |   4 +-
 .../operators/testutils/DriverTestBase.java     |   8 +-
 .../operators/testutils/TaskTestBase.java       |   6 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   4 +-
 .../api/scala/functions/CrossFunction.scala     |  28 +-
 .../api/scala/operators/CrossOperator.scala     |  69 +--
 .../api/scala/operators/IterateOperators.scala  |   2 +-
 .../test/compiler/util/CompilerTestBase.java    |   4 +-
 .../test/compiler/util/OperatorResolver.java    |   4 +-
 flink-tests/pom.xml                             |   2 +-
 ...ultipleJoinsWithSolutionSetCompilerTest.java |  14 +-
 .../BulkIterationWithAllReducerITCase.java      |   4 +-
 .../CoGroupConnectedComponentsSecondITCase.java |  12 +-
 .../DependencyConnectedComponentsITCase.java    |  23 +-
 .../aggregators/AggregatorsITCase.java          |  20 +-
 ...nentsWithParametrizableAggregatorITCase.java |  15 +-
 ...entsWithParametrizableConvergenceITCase.java |  15 +-
 .../CustomCompensatableDotProductCoGroup.java   |   6 +-
 .../CustomCompensatableDotProductMatch.java     |   8 +-
 .../CustomCompensatingMap.java                  |   4 +-
 .../CustomRankCombiner.java                     |  10 +-
 .../test/javaApiOperators/CoGroupITCase.java    |  17 +-
 .../test/javaApiOperators/CrossITCase.java      |  15 +-
 .../test/javaApiOperators/DistinctITCase.java   |   4 +-
 .../test/javaApiOperators/FilterITCase.java     |   9 +-
 .../test/javaApiOperators/FlatMapITCase.java    |   5 +-
 .../javaApiOperators/GroupReduceITCase.java     | 578 ++++++++++---------
 .../flink/test/javaApiOperators/JoinITCase.java | 105 ++--
 .../flink/test/javaApiOperators/MapITCase.java  |   7 +-
 .../test/javaApiOperators/ReduceITCase.java     |  19 +-
 .../test/javaApiOperators/UnionITCase.java      |   4 +-
 .../flink/test/operators/CrossITCase.java       |  49 +-
 .../recordJobs/kmeans/udfs/ComputeDistance.java |  16 +-
 .../test/recordJobs/relational/TPCHQuery3.java  |   2 +-
 .../flink/test/util/testjar/KMeansForTest.java  |  16 +-
 pom.xml                                         |   3 +
 244 files changed, 4106 insertions(+), 2872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index 637a5e9..386f318 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -30,7 +30,7 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.io.AvroOutputFormat;
 import org.apache.flink.api.java.record.io.avro.example.User;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -125,7 +125,7 @@ public class AvroOutputFormatTest extends 
JavaProgramTestBase {
        }
 
 
-       public final static class ConvertToUser extends 
MapFunction<Tuple3<String, Integer, String>, User> {
+       public final static class ConvertToUser extends 
RichMapFunction<Tuple3<String, Integer, String>, User> {
 
                @Override
                public User map(Tuple3<String, Integer, String> value) throws 
Exception {
@@ -133,7 +133,7 @@ public class AvroOutputFormatTest extends 
JavaProgramTestBase {
                }
        }
 
-       public final static class ConvertToReflective extends MapFunction<User, 
ReflectiveUser> {
+       public final static class ConvertToReflective extends 
RichMapFunction<User, ReflectiveUser> {
 
                @Override
                public ReflectiveUser map(User value) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 146c72b..75b7da6 100644
--- 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -40,8 +40,8 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -141,7 +141,7 @@ public class AvroExternalJarProgram  {
        
        // 
--------------------------------------------------------------------------------------------
        
-       public static final class NameExtractor extends MapFunction<MyUser, 
Tuple2<String, MyUser>> {
+       public static final class NameExtractor extends RichMapFunction<MyUser, 
Tuple2<String, MyUser>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -151,7 +151,7 @@ public class AvroExternalJarProgram  {
                }
        }
        
-       public static final class NameGrouper extends 
ReduceFunction<Tuple2<String, MyUser>> {
+       public static final class NameGrouper extends 
RichReduceFunction<Tuple2<String, MyUser>> {
                private static final long serialVersionUID = 1L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
index 4e8ffa9..ba09e77 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.hadoopcompatibility.mapred.example;
 
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -28,8 +29,7 @@ import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
@@ -88,7 +88,7 @@ public class WordCount {
        /**
         * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
         */
-       public static final class Tokenizer extends 
FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
                
                @Override
                public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
@@ -108,7 +108,7 @@ public class WordCount {
        /**
         * Converts Java data types to Hadoop Writables.
         */
-       public static final class HadoopDatatypeMapper extends 
MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
                
                @Override
                public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 36ea378..c00a14a 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -19,8 +19,8 @@
 package org.apache.flink.hadoopcompatibility.mapreduce.example;
 
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.hadoop.fs.Path;
@@ -89,7 +89,7 @@ public class WordCount {
        /**
         * Splits a line into words and converts Hadoop Writables into normal 
Java data types.
         */
-       public static final class Tokenizer extends 
FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+       public static final class Tokenizer extends 
RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
                
                @Override
                public void flatMap(Tuple2<LongWritable, Text> value, 
Collector<Tuple2<String, Integer>> out) {
@@ -109,7 +109,7 @@ public class WordCount {
        /**
         * Converts Java data types to Hadoop Writables.
         */
-       public static final class HadoopDatatypeMapper extends 
MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+       public static final class HadoopDatatypeMapper extends 
RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
                
                @Override
                public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> 
value) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index bb84cea..65be2f8 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
-import org.apache.flink.api.java.functions.CoGroupFunction;
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -393,7 +393,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
        // 
--------------------------------------------------------------------------------------------
        
        private static final class VertexUpdateUdf<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message> 
-               extends CoGroupFunction<Tuple2<VertexKey, Message>, 
Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
+               extends RichCoGroupFunction<Tuple2<VertexKey, Message>, 
Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>>
                implements ResultTypeQueryable<Tuple2<VertexKey, VertexValue>>
        {
                private static final long serialVersionUID = 1L;
@@ -463,7 +463,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
         * UDF that encapsulates the message sending function for graphs where 
the edges have no associated values.
         */
        private static final class MessagingUdfNoEdgeValues<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message> 
-               extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, 
Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+               extends RichCoGroupFunction<Tuple2<VertexKey, VertexKey>, 
Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
                implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
        {
                private static final long serialVersionUID = 1L;
@@ -516,7 +516,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey>, Ver
         * UDF that encapsulates the message sending function for graphs where 
the edges have an associated value.
         */
        private static final class MessagingUdfWithEdgeValues<VertexKey extends 
Comparable<VertexKey>, VertexValue, Message, EdgeValue> 
-               extends CoGroupFunction<Tuple3<VertexKey, VertexKey, 
EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
+               extends RichCoGroupFunction<Tuple3<VertexKey, VertexKey, 
EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
                implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
        {
                private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
index ea90feb..a4ba6fa 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.spargel.java.MessageIterator;
 import org.apache.flink.spargel.java.MessagingFunction;
@@ -70,7 +70,7 @@ public class SpargelConnectedComponents {
         * A map function that takes a Long value and creates a 2-tuple out of 
it:
         * <pre>(Long value) -> (value, value)</pre>
         */
-       public static final class IdAssigner extends MapFunction<Long, 
Tuple2<Long, Long>> {
+       public static final class IdAssigner extends RichMapFunction<Long, 
Tuple2<Long, Long>> {
                @Override
                public Tuple2<Long, Long> map(Long value) {
                        return new Tuple2<Long, Long>(value, value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
index c7fbaaa..9dfc327 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.spargel.java.MessageIterator;
@@ -48,7 +48,7 @@ public class SpargelPageRank {
                
                // enumerate some sample edges and assign an initial uniform 
probability (rank)
                DataSet<Tuple2<Long, Double>> intialRanks = 
env.generateSequence(1, numVertices)
-                                                               .map(new 
MapFunction<Long, Tuple2<Long, Double>>() {
+                                                               .map(new 
RichMapFunction<Long, Tuple2<Long, Double>>() {
                                                                        public 
Tuple2<Long, Double> map(Long value) {
                                                                                
return new Tuple2<Long, Double>(value, 1.0/numVertices);
                                                                        }
@@ -56,7 +56,7 @@ public class SpargelPageRank {
                
                // generate some random edges. the transition probability on 
each edge is 1/num-out-edges of the source vertex
                DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = 
env.generateSequence(1, numVertices)
-                                                               .flatMap(new 
FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+                                                               .flatMap(new 
RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
                                                                        public 
void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
                                                                                
int numOutEdges = (int) (Math.random() * (numVertices / 2));
                                                                                
for (int i = 0; i < numOutEdges; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 34c9ad8..43c0b84 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -53,7 +53,7 @@ public class SpargelPageRankCountingVertices {
                
                // generate some random edges. the transition probability on 
each edge is 1/num-out-edges of the source vertex
                DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = 
env.generateSequence(1, NUM_VERTICES)
-                                                               .flatMap(new 
FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+                                                               .flatMap(new 
RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
                                                                        public 
void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
                                                                                
int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
                                                                                
for (int i = 0; i < numOutEdges; i++) {
@@ -67,12 +67,12 @@ public class SpargelPageRankCountingVertices {
                
                // count the number of vertices
                DataSet<Long> count = vertices
-                       .map(new MapFunction<Long, Long>() {
+                       .map(new RichMapFunction<Long, Long>() {
                                public Long map(Long value) {
                                        return 1L;
                                }
                        })
-                       .reduce(new ReduceFunction<Long>() {
+                       .reduce(new RichReduceFunction<Long>() {
                                public Long reduce(Long value1, Long value2) {
                                        return value1 + value2;
                                }
@@ -80,7 +80,7 @@ public class SpargelPageRankCountingVertices {
                
                // enumerate some sample edges and assign an initial uniform 
probability (rank)
                DataSet<Tuple2<Long, Double>> intialRanks = vertices
-                       .map(new MapFunction<Long, Tuple2<Long, Double>>() {
+                       .map(new RichMapFunction<Long, Tuple2<Long, Double>>() {
                                
                                private long numVertices;
                                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
 
b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
index a34f2db..16b004c 100644
--- 
a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
+++ 
b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.spargel;
 
 import java.io.BufferedReader;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -72,7 +72,7 @@ public class SpargelConnectedComponentsITCase extends 
JavaProgramTestBase {
                }
        }
        
-       public static final class EdgeParser extends MapFunction<String, 
Tuple2<Long, Long>> {
+       public static final class EdgeParser extends RichMapFunction<String, 
Tuple2<Long, Long>> {
                public Tuple2<Long, Long> map(String value) {
                        String[] nums = value.split(" ");
                        return new Tuple2<Long, Long>(Long.parseLong(nums[0]), 
Long.parseLong(nums[1]));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index 7320b7b..2b64b84 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -19,7 +19,7 @@
 package org.apache.flink.client.testjar;
 
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
@@ -75,7 +75,7 @@ public class WordCount {
         * FlatMapFunction. The function takes a line (String) and splits it 
into 
         * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
         */
-       public static final class Tokenizer extends FlatMapFunction<String, 
Tuple2<String, Integer>> {
+       public static final class Tokenizer extends RichFlatMapFunction<String, 
Tuple2<String, Integer>> {
 
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
index 9f10be5..6eccc8a 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/util/NoOpFunction.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.compiler.util;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 
 
-public class NoOpFunction extends AbstractFunction {
+public class NoOpFunction extends AbstractRichFunction {
        private static final long serialVersionUID = 1L;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 571f4e4..10fa34f 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.CoGroupOperator;
 import org.apache.flink.api.java.record.operators.CrossOperator;
@@ -40,7 +40,6 @@ import 
org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.JoinOperator;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
@@ -959,7 +958,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
                result1.join(result2)
                                .where(new IdentityKeyExtractor<String>())
                                .equalTo(new IdentityKeyExtractor<String>())
-                               .with(new JoinFunction<String, String, 
String>() {
+                               .with(new RichJoinFunction<String, String, 
String>() {
                                        @Override
                                        public String join(String first, String 
second) {
                                                return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
index 55c53f0..8226dbf 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,9 +28,8 @@ import 
org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
-import org.apache.flink.api.java.functions.JoinFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.dag.TempMode;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -256,7 +255,7 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                
        }
        
-       private static class DummyJoiner extends JoinFunction<Tuple3<Long, 
Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
+       private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, 
Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
 
                @Override
                public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> 
first,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
index 1c30545..3624d86 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CoGroupSolutionSetFirstTest.java
@@ -21,16 +21,15 @@ package org.apache.flink.compiler;
 
 import java.util.Iterator;
 
+import org.apache.flink.api.java.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.CoGroupFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -43,13 +42,13 @@ import org.apache.flink.util.Visitor;
 @SuppressWarnings("serial")
 public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
        
-       public static class SimpleCGroup extends 
CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+       public static class SimpleCGroup extends 
RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
                @Override
                public void coGroup(Iterator<Tuple1<Integer>> first, 
Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws 
Exception {
                }
        }
 
-       public static class SimpleMap extends MapFunction<Tuple1<Integer>, 
Tuple1<Integer>> {
+       public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, 
Tuple1<Integer>> {
                @Override
                public Tuple1<Integer> map(Tuple1<Integer> value) throws 
Exception {
                        return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index ff4d6b0..b2c163b 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -27,14 +27,12 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.compiler.DataStatistics;
-import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.costs.DefaultCostEstimator;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.PlanNode;
@@ -181,7 +179,7 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
                }
                
                @SuppressWarnings("unchecked")
-               public <T extends PlanNode> T getNode(String name, Class<? 
extends Function> stubClass) {
+               public <T extends PlanNode> T getNode(String name, Class<? 
extends RichFunction> stubClass) {
                        List<PlanNode> nodes = this.map.get(name);
                        if (nodes == null || nodes.isEmpty()) {
                                throw new RuntimeException("No node found with 
the given name and stub class.");
@@ -243,7 +241,7 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
                }
                
                @SuppressWarnings("unchecked")
-               public <T extends Operator<?>> T getNode(String name, Class<? 
extends Function> stubClass) {
+               public <T extends Operator<?>> T getNode(String name, Class<? 
extends RichFunction> stubClass) {
                        List<Operator<?>> nodes = this.map.get(name);
                        if (nodes == null || nodes.isEmpty()) {
                                throw new RuntimeException("No node found with 
the given name and stub class.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index d50a7d6..0fbf072 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -22,8 +22,8 @@ package org.apache.flink.compiler;
 import static 
org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport.*;
 import static org.junit.Assert.*;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.Order;
@@ -47,7 +47,7 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SourcePlanNode;
 import 
org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.compiler.testfunctions.DummyJoinFunction;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.DriverStrategy;
@@ -1426,10 +1426,10 @@ public class FeedbackPropertiesMatchTest {
        }
        
        private static final MapNode getMapNode() {
-               return new MapNode(new MapOperatorBase<String, String, 
GenericMap<String,String>>(new IdentityMapper<String>(), new 
UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO), "map op"));
+               return new MapNode(new MapOperatorBase<String, String, 
MapFunction<String,String>>(new IdentityMapper<String>(), new 
UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO), "map op"));
        }
        
        private static final MatchNode getJoinNode() {
-               return new MatchNode(new JoinOperatorBase<String, String, 
String, GenericJoiner<String, String, String>>(new DummyJoinFunction<String>(), 
new BinaryOperatorInformation<String, String, 
String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+               return new MatchNode(new JoinOperatorBase<String, String, 
String, FlatJoinFunction<String, String, String>>(new 
DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, 
String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index f3e513a..b9cc769 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -23,9 +23,9 @@ import java.util.Iterator;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.ReduceGroupOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Collector;
@@ -50,7 +50,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        
                        DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 
0.5).name("source");
                        
-                       data.reduceGroup(new GroupReduceFunction<Double, 
Double>() {
+                       data.reduceGroup(new RichGroupReduceFunction<Double, 
Double>() {
                                public void reduce(Iterator<Double> values, 
Collector<Double> out) {}
                        }).name("reducer")
                        .print().name("sink");
@@ -94,7 +94,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        
                        DataSet<Long> data = env.generateSequence(1, 
8000000).name("source");
                        
-                       ReduceGroupOperator<Long, Long> reduced = 
data.reduceGroup(new GroupReduceFunction<Long, Long>() {
+                       GroupReduceOperator<Long, Long> reduced = 
data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
                                public void reduce(Iterator<Long> values, 
Collector<Long> out) {}
                        }).name("reducer");
                        
@@ -147,7 +147,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        
                        data
                                .groupBy(1)
-                               .reduceGroup(new 
GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
                                public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer")
                        .print().name("sink");
@@ -194,9 +194,9 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
                                .name("source").setParallelism(6);
                        
-                       ReduceGroupOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
+                       GroupReduceOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
                                        .groupBy(1)
-                                       .reduceGroup(new 
GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                                       .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
                                public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer");
                        
@@ -255,7 +255,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                                .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
                                        public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
                                })
-                               .reduceGroup(new 
GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
                                public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer")
                        .print().name("sink");
@@ -309,11 +309,11 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        DataSet<Tuple2<String, Double>> data = 
env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
                                .name("source").setParallelism(6);
                        
-                       ReduceGroupOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
+                       GroupReduceOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
                                .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
                                        public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
                                })
-                               .reduceGroup(new 
GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
+                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
                                public void reduce(Iterator<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
                        }).name("reducer");
                        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index d3c3e3f..8fc4324 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.junit.Test;
 
 import java.util.Iterator;
@@ -31,10 +32,9 @@ import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -261,7 +261,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                
        }
        
-       public static final class Join222 extends JoinFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class Join222 extends RichJoinFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                @Override
                public Tuple2<Long, Long> join(Tuple2<Long, Long> 
vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -269,13 +269,13 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                }
        }
        
-       public static final class FlatMapJoin extends 
FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, Tuple2<Long, 
Long>> {
+       public static final class FlatMapJoin extends 
RichFlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>, 
Tuple2<Long, Long>> {
                
                @Override
                public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, 
Long>> value, Collector<Tuple2<Long, Long>> out) {}
        }
        
-       public static final class DummyMap extends MapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+       public static final class DummyMap extends RichMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
 
                @Override
                public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws 
Exception {
@@ -284,14 +284,14 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
        }
        
        @ConstantFields("0")
-       public static final class Reduce101 extends 
GroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
+       public static final class Reduce101 extends 
RichGroupReduceFunction<Tuple1<Long>, Tuple1<Long>> {
                
                @Override
                public void reduce(Iterator<Tuple1<Long>> values, 
Collector<Tuple1<Long>> out) {}
        }
        
        @ConstantFields("0")
-       public static final class DuplicateValue extends 
MapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+       public static final class DuplicateValue extends 
RichMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
 
                @Override
                public Tuple2<Long, Long> map(Tuple1<Long> value) throws 
Exception {
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends 
CompilerTestBase {
                }
        }
        
-       public static final class DuplicateValueScalar<T> extends 
MapFunction<T, Tuple2<T, T>> {
+       public static final class DuplicateValueScalar<T> extends 
RichMapFunction<T, Tuple2<T, T>> {
 
                @Override
                public Tuple2<T, T> map(T value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index a654872..fb8ae8d 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -22,7 +22,7 @@ package org.apache.flink.compiler;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
@@ -46,7 +46,7 @@ public class ReduceCompilationTest extends CompilerTestBase 
implements java.io.S
                        
                        DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 
0.5).name("source");
                        
-                       data.reduce(new ReduceFunction<Double>() {
+                       data.reduce(new RichReduceFunction<Double>() {
                                
                                @Override
                                public Double reduce(Double value1, Double 
value2){
@@ -91,7 +91,7 @@ public class ReduceCompilationTest extends CompilerTestBase 
implements java.io.S
                        
                        DataSet<Long> data = env.generateSequence(1, 
8000000).name("source");
                        
-                       data.reduce(new ReduceFunction<Long>() {
+                       data.reduce(new RichReduceFunction<Long>() {
                                
                                @Override
                                public Long reduce(Long value1, Long value2){
@@ -145,7 +145,7 @@ public class ReduceCompilationTest extends CompilerTestBase 
implements java.io.S
                        
                        data
                                .groupBy(1)
-                               .reduce(new 
ReduceFunction<Tuple2<String,Double>>() {
+                               .reduce(new 
RichReduceFunction<Tuple2<String,Double>>() {
                                @Override
                                public Tuple2<String, Double> 
reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
                                        return null;
@@ -205,7 +205,7 @@ public class ReduceCompilationTest extends CompilerTestBase 
implements java.io.S
                                .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
                                        public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
                                })
-                               .reduce(new 
ReduceFunction<Tuple2<String,Double>>() {
+                               .reduce(new 
RichReduceFunction<Tuple2<String,Double>>() {
                                @Override
                                public Tuple2<String, Double> 
reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2){
                                        return null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
index 1f653c0..1020c8b 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/UnionPropertyPropagationTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
@@ -174,7 +174,7 @@ public class UnionPropertyPropagationTest extends 
CompilerTestBase {
                });
        }
 
-       public static final class DummyFlatMap extends FlatMapFunction<String, 
Tuple2<String, Integer>> {
+       public static final class DummyFlatMap extends 
RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index e04256c..64a4791 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichJoinFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -214,7 +214,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        iter.getWorkset().join(invariantInput)
                                .where(1, 2)
                                .equalTo(1, 2)
-                               .with(new JoinFunction<Tuple3<Long,Long,Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+                               .with(new 
RichJoinFunction<Tuple3<Long,Long,Long>, Tuple3<Long, Long, Long>, 
Tuple3<Long,Long,Long>>() {
                                        public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                                                return first;
                                        }
@@ -224,7 +224,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        result.join(iter.getSolutionSet())
                                .where(1, 0)
                                .equalTo(0, 2)
-                               .with(new JoinFunction<Tuple3<Long, Long, 
Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+                               .with(new RichJoinFunction<Tuple3<Long, Long, 
Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
                                        public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                                                return second;
                                        }
@@ -263,7 +263,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                iter.getWorkset().join(invariantInput)
                        .where(1, 2)
                        .equalTo(1, 2)
-                       .with(new JoinFunction<Tuple3<Long,Long,Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
+                       .with(new RichJoinFunction<Tuple3<Long,Long,Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>() {
                                public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                                        return first;
                                }
@@ -273,7 +273,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                .join(iter.getSolutionSet())
                        .where(1, 0)
                        .equalTo(1, 2)
-                       .with(new JoinFunction<Tuple3<Long, Long, Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
+                       .with(new RichJoinFunction<Tuple3<Long, Long, Long>, 
Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
                                public Tuple3<Long, Long, Long> 
join(Tuple3<Long, Long, Long> first, Tuple3<Long, Long, Long> second) {
                                        return second;
                                }
@@ -282,7 +282,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                        .withConstantSetSecond(joinPreservesSolutionSet ? new 
String[] {"0->0", "1->1", "2->2" } : null);
                        
                DataSet<Tuple3<Long, Long, Long>> nextWorkset = 
joinedWithSolutionSet.groupBy(1, 2)
-                       .reduceGroup(new 
GroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
+                       .reduceGroup(new 
RichGroupReduceFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>() {
                                public void reduce(Iterator<Tuple3<Long, Long, 
Long>> values, Collector<Tuple3<Long, Long, Long>> out) {}
                        })
                        .name(NEXT_WORKSET_REDUCER_NAME)
@@ -290,7 +290,7 @@ public class WorksetIterationsJavaApiCompilerTest extends 
CompilerTestBase {
                
                
                DataSet<Tuple3<Long, Long, Long>> nextSolutionSet = 
mapBeforeSolutionDelta ?
-                               joinedWithSolutionSet.map(new 
MapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public 
Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } 
})
+                               joinedWithSolutionSet.map(new 
RichMapFunction<Tuple3<Long, Long, Long>,Tuple3<Long, Long, Long>>() { public 
Tuple3<Long, Long, Long> map(Tuple3<Long, Long, Long> value) { return value; } 
})
                                        
.name(SOLUTION_DELTA_MAPPER_NAME).withConstantSet("0->0","1->1","2->2") :
                                joinedWithSolutionSet;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
new file mode 100644
index 0000000..2388db4
--- /dev/null
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyFlatJoinFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.java.functions.RichFlatJoinFunction;
+import org.apache.flink.util.Collector;
+
+public class DummyFlatJoinFunction<T> extends RichFlatJoinFunction<T, T, T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public void join(T first, T second, Collector<T> out) {
+               out.collect(null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
deleted file mode 100644
index 0db075f..0000000
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/DummyJoinFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.compiler.testfunctions;
-
-import org.apache.flink.api.java.functions.JoinFunction;
-
-public class DummyJoinFunction<T> extends JoinFunction<T, T, T> {
-
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public T join(T first, T second) {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
index fe61f25..42275af 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityGroupReducer.java
@@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 
 @Combinable
-public class IdentityGroupReducer<T> extends GroupReduceFunction<T, T> {
+public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
index b6aa40b..29fc2c8 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/IdentityMapper.java
@@ -19,9 +19,9 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 
-public class IdentityMapper<T> extends MapFunction<T, T> {
+public class IdentityMapper<T> extends RichMapFunction<T, T> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
index 91634cc..7ce267f 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.compiler.testfunctions;
 
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichReduceFunction;
 
-public class SelectOneReducer<T> extends ReduceFunction<T> {
+public class SelectOneReducer<T> extends RichReduceFunction<T> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
index 26db00e..3f24e65 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/Top1GroupReducer.java
@@ -21,13 +21,13 @@ package org.apache.flink.compiler.testfunctions;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
 
 @Combinable
-public class Top1GroupReducer<T> extends GroupReduceFunction<T, T> {
+public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
index 51ad75d..736ee14 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
@@ -23,14 +23,13 @@ import java.io.Serializable;
 
 import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
 
-public class DummyCrossStub extends CrossFunction implements Serializable {
+public class DummyCrossStub extends CrossFunction {
        private static final long serialVersionUID = 1L;
 
+
        @Override
-       public void cross(Record record1, Record record2, Collector<Record> 
out) {
-               out.collect(record1);
-               out.collect(record2);
+       public Record cross(Record first, Record second) throws Exception {
+               return first;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
deleted file mode 100644
index f4b2763..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractFunction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An abstract stub implementation for user-defined functions. It offers 
default implementations
- * for {@link #open(Configuration)} and {@link #close()}. It also offers 
access to the
- * {@link RuntimeContext} and {@link IterationRuntimeContext}.
- */
-public abstract class AbstractFunction implements Function, Serializable {
-       
-       private static final long serialVersionUID = 1L;
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Runtime context access
-       // 
--------------------------------------------------------------------------------------------
-       
-       private transient RuntimeContext runtimeContext;
-
-       public void setRuntimeContext(RuntimeContext t) {
-               if (this.runtimeContext == null) {
-                       this.runtimeContext = t;
-               } else {
-                       throw new IllegalStateException("Error: The runtime 
context has already been set.");
-               }
-       }
-       
-       public RuntimeContext getRuntimeContext() {
-               if (this.runtimeContext != null) {
-                       return this.runtimeContext;
-               } else {
-                       throw new IllegalStateException("The runtime context 
has not been initialized.");
-               }
-       }
-       
-       public IterationRuntimeContext getIterationRuntimeContext() {
-               if (this.runtimeContext == null) {
-                       throw new IllegalStateException("The runtime context 
has not been initialized.");
-               } else if (this.runtimeContext instanceof 
IterationRuntimeContext) {
-                       return (IterationRuntimeContext) this.runtimeContext;
-               } else {
-                       throw new IllegalStateException("This stub is not part 
of an iteration step function.");
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Default life cycle methods
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void open(Configuration parameters) throws Exception {}
-
-       @Override
-       public void close() throws Exception {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
new file mode 100644
index 0000000..07b957d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An abstract stub implementation for user-defined functions. It offers 
default implementations
+ * for {@link #open(Configuration)} and {@link #close()}. It also offers 
access to the
+ * {@link RuntimeContext} and {@link IterationRuntimeContext}.
+ */
+public abstract class AbstractRichFunction implements RichFunction, 
Serializable {
+       
+       private static final long serialVersionUID = 1L;
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Runtime context access
+       // 
--------------------------------------------------------------------------------------------
+       
+       private transient RuntimeContext runtimeContext;
+
+       public void setRuntimeContext(RuntimeContext t) {
+               if (this.runtimeContext == null) {
+                       this.runtimeContext = t;
+               } else {
+                       throw new IllegalStateException("Error: The runtime 
context has already been set.");
+               }
+       }
+       
+       public RuntimeContext getRuntimeContext() {
+               if (this.runtimeContext != null) {
+                       return this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               }
+       }
+       
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               if (this.runtimeContext == null) {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               } else if (this.runtimeContext instanceof 
IterationRuntimeContext) {
+                       return (IterationRuntimeContext) this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("This stub is not part 
of an iteration step function.");
+               }
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Default life cycle methods
+       // 
--------------------------------------------------------------------------------------------
+       
+       @Override
+       public void open(Configuration parameters) throws Exception {}
+
+       @Override
+       public void close() throws Exception {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
new file mode 100644
index 0000000..5c200af
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+
+public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
+       
+       /**
+        * This method must be implemented to provide a user implementation of a
+        * coGroup. It is called for each two key-value pairs that share the 
same
+        * key and come from different inputs.
+        * 
+        * @param first The records from the first input which were paired with 
the key.
+        * @param second The records from the second input which were paired 
with the key.
+        * @param out A collector that collects all output pairs.
+        */
+       void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) 
throws Exception;
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
new file mode 100644
index 0000000..d72c4c8
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * Generic interface used for combiners.
+ */
+public interface CombineFunction<T> extends Function, Serializable {
+
+       T combine(Iterator<T> records) throws Exception;
+}

Reply via email to