[FLINK-701] Several cleanups after SAM refactoring.
  - Lambda detection compiles on earlier java versions
  - Add lambda detection test.
  - Fix JavaDocs


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/bc89e911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/bc89e911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/bc89e911

Branch: refs/heads/release-0.6
Commit: bc89e911b2cd433bb841beab9e8e513ca0c3525d
Parents: 22b24f2
Author: Stephan Ewen <[email protected]>
Authored: Thu Jul 31 19:46:14 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 31 20:18:00 2014 +0200

----------------------------------------------------------------------
 .../examples/SpargelConnectedComponents.java    |   4 +-
 .../spargel/java/examples/SpargelPageRank.java  |   8 +-
 .../SpargelPageRankCountingVertices.java        |  11 +-
 .../apache/flink/client/web/JobsServlet.java    |   1 -
 .../org/apache/flink/compiler/PactCompiler.java |   6 +-
 .../apache/flink/compiler/CompilerTestBase.java |   6 +-
 .../flink/compiler/util/DummyCrossStub.java     |   4 -
 .../common/functions/AbstractRichFunction.java  |   8 +-
 .../api/common/functions/CoGroupFunction.java   |  38 +++++--
 .../api/common/functions/CombineFunction.java   |  24 ++++-
 .../api/common/functions/CrossFunction.java     |  38 +++++--
 .../api/common/functions/ExecutionContext.java  |  51 ----------
 .../api/common/functions/FilterFunction.java    |  25 ++++-
 .../common/functions/FlatCombineFunction.java   |  22 +++-
 .../api/common/functions/FlatJoinFunction.java  |  45 +++++++-
 .../api/common/functions/FlatMapFunction.java   |  23 +++--
 .../flink/api/common/functions/Function.java    |   1 -
 .../common/functions/GenericCollectorMap.java   |  10 +-
 .../common/functions/GroupReduceFunction.java   |  34 +++++--
 .../api/common/functions/JoinFunction.java      |  40 +++++++-
 .../flink/api/common/functions/MapFunction.java |  21 +++-
 .../api/common/functions/ReduceFunction.java    |  26 ++++-
 .../api/common/functions/RichFunction.java      |   1 -
 .../api/common/functions/RuntimeContext.java    |  28 -----
 .../common/functions/util/FunctionUtils.java    |  38 ++++---
 .../flink/api/common/operators/Union.java       |   1 -
 .../operators/base/BulkIterationBase.java       |   1 +
 .../base/CollectorMapOperatorBase.java          |   2 +-
 .../api/common/operators/util/OperatorUtil.java |   3 +-
 .../common/operators/util/OperatorUtilTest.java |   5 +
 .../java/org/apache/flink/api/java/DataSet.java |  49 ++++-----
 .../flink/api/java/ExecutionEnvironment.java    |   2 +-
 .../api/java/functions/FlatMapIterator.java     |   2 +-
 .../api/java/functions/RichCoGroupFunction.java |  39 +------
 .../api/java/functions/RichCrossFunction.java   |  35 +------
 .../api/java/functions/RichFilterFunction.java  |  28 +----
 .../java/functions/RichFlatCombineFunction.java |   9 ++
 .../java/functions/RichFlatJoinFunction.java    |  43 +-------
 .../api/java/functions/RichFlatMapFunction.java |  28 +----
 .../java/functions/RichGroupReduceFunction.java |  32 +-----
 .../api/java/functions/RichJoinFunction.java    |  12 ++-
 .../api/java/functions/RichMapFunction.java     |  29 +-----
 .../api/java/functions/RichReduceFunction.java  |  35 +------
 .../api/java/operators/CoGroupOperator.java     |   2 +-
 .../flink/api/java/operators/CrossOperator.java |   2 +-
 .../api/java/operators/DistinctOperator.java    |  28 ++---
 .../api/java/operators/GroupReduceOperator.java |   7 +-
 .../flink/api/java/operators/Grouping.java      |   4 +-
 .../flink/api/java/operators/JoinOperator.java  |   6 +-
 .../api/java/operators/SortedGrouping.java      |   4 +-
 .../api/java/operators/UnsortedGrouping.java    |   2 +-
 .../PlanUnwrappingReduceGroupOperator.java      |   2 +-
 .../java/record/functions/CoGroupFunction.java  |   2 +-
 .../java/record/functions/CrossFunction.java    |   6 +-
 .../api/java/record/functions/JoinFunction.java |  15 +--
 .../api/java/record/functions/MapFunction.java  |   2 +-
 .../java/record/functions/ReduceFunction.java   |   2 +-
 .../api/java/record/operators/MapOperator.java  |   1 +
 .../DeltaIterationTranslationTest.java          |   2 +-
 .../translation/DistrinctTranslationTest.java   |  57 +++++++++++
 .../java/type/extractor/TypeExtractorTest.java  |   4 +
 .../javaApiOperators/lambdas/CoGroupITCase.java |   1 +
 .../javaApiOperators/lambdas/CrossITCase.java   |   1 +
 .../javaApiOperators/lambdas/FilterITCase.java  |  70 ++-----------
 .../lambdas/FlatJoinITCase.java                 |   1 +
 .../javaApiOperators/lambdas/FlatMapITCase.java |   1 +
 .../lambdas/GroupReduceITCase.java              |   1 +
 .../javaApiOperators/lambdas/JoinITCase.java    |   1 +
 .../lambdas/LambdaExtractionTest.java           |  82 +++++++++++++++
 .../javaApiOperators/lambdas/MapITCase.java     |   1 +
 .../javaApiOperators/lambdas/ReduceITCase.java  | 102 +++++--------------
 .../SolutionSetFastUpdateOutputCollector.java   |   6 +-
 .../io/SolutionSetUpdateOutputCollector.java    |   2 +-
 .../task/AbstractIterativePactTask.java         |   4 +-
 .../iterative/task/IterationHeadPactTask.java   |   7 +-
 .../task/IterationIntermediatePactTask.java     |   9 +-
 .../task/IterationSynchronizationSinkTask.java  |   4 +-
 .../iterative/task/IterationTailPactTask.java   |   9 +-
 .../operators/RuntimeExecutionContext.java      |  56 ----------
 .../flink/test/operators/CrossITCase.java       |  18 +---
 80 files changed, 658 insertions(+), 734 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
index a4ba6fa..0f1f26d 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelConnectedComponents.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.spargel.java.MessageIterator;
 import org.apache.flink.spargel.java.MessagingFunction;
@@ -70,7 +70,7 @@ public class SpargelConnectedComponents {
         * A map function that takes a Long value and creates a 2-tuple out of 
it:
         * <pre>(Long value) -> (value, value)</pre>
         */
-       public static final class IdAssigner extends RichMapFunction<Long, 
Tuple2<Long, Long>> {
+       public static final class IdAssigner implements MapFunction<Long, 
Tuple2<Long, Long>> {
                @Override
                public Tuple2<Long, Long> map(Long value) {
                        return new Tuple2<Long, Long>(value, value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
index 9dfc327..6d3dc95 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRank.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.spargel.java.MessageIterator;
@@ -48,7 +48,7 @@ public class SpargelPageRank {
                
                // enumerate some sample edges and assign an initial uniform 
probability (rank)
                DataSet<Tuple2<Long, Double>> intialRanks = 
env.generateSequence(1, numVertices)
-                                                               .map(new 
RichMapFunction<Long, Tuple2<Long, Double>>() {
+                                                               .map(new 
MapFunction<Long, Tuple2<Long, Double>>() {
                                                                        public 
Tuple2<Long, Double> map(Long value) {
                                                                                
return new Tuple2<Long, Double>(value, 1.0/numVertices);
                                                                        }
@@ -56,7 +56,7 @@ public class SpargelPageRank {
                
                // generate some random edges. the transition probability on 
each edge is 1/num-out-edges of the source vertex
                DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = 
env.generateSequence(1, numVertices)
-                                                               .flatMap(new 
RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+                                                               .flatMap(new 
FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
                                                                        public 
void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
                                                                                
int numOutEdges = (int) (Math.random() * (numVertices / 2));
                                                                                
for (int i = 0; i < numOutEdges; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
index 43c0b84..01d2cd7 100644
--- 
a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
+++ 
b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/examples/SpargelPageRankCountingVertices.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.spargel.java.examples;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -53,7 +54,7 @@ public class SpargelPageRankCountingVertices {
                
                // generate some random edges. the transition probability on 
each edge is 1/num-out-edges of the source vertex
                DataSet<Tuple3<Long, Long, Double>> edgesWithProbability = 
env.generateSequence(1, NUM_VERTICES)
-                                                               .flatMap(new 
RichFlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
+                                                               .flatMap(new 
FlatMapFunction<Long, Tuple3<Long, Long, Double>>() {
                                                                        public 
void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
                                                                                
int numOutEdges = (int) (Math.random() * (NUM_VERTICES / 2));
                                                                                
for (int i = 0; i < numOutEdges; i++) {
@@ -67,12 +68,12 @@ public class SpargelPageRankCountingVertices {
                
                // count the number of vertices
                DataSet<Long> count = vertices
-                       .map(new RichMapFunction<Long, Long>() {
+                       .map(new MapFunction<Long, Long>() {
                                public Long map(Long value) {
                                        return 1L;
                                }
                        })
-                       .reduce(new RichReduceFunction<Long>() {
+                       .reduce(new ReduceFunction<Long>() {
                                public Long reduce(Long value1, Long value2) {
                                        return value1 + value2;
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
index c62f0d7..cd53115 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobsServlet.java
@@ -180,7 +180,6 @@ public class JobsServlet extends HttpServlet {
                // parse the request
                ServletFileUpload uploadHandler = new 
ServletFileUpload(fileItemFactory);
                try {
-                       @SuppressWarnings("unchecked")
                        Iterator<FileItem> itr = ((List<FileItem>) 
uploadHandler.parseRequest(req)).iterator();
 
                        // go over the form fields and look for our file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index b55dea0..e22a365 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -37,7 +37,6 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CollectorMapOperatorBase;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.common.operators.base.FilterOperatorBase;
@@ -648,6 +647,7 @@ public class PactCompiler {
                        this.forceDOP = forceDOP;
                }
 
+               @SuppressWarnings("deprecation")
                @Override
                public boolean preVisit(Operator<?> c) {
                        // check if we have been here before
@@ -671,8 +671,8 @@ public class PactCompiler {
                        else if (c instanceof MapOperatorBase) {
                                n = new MapNode((MapOperatorBase<?, ?, ?>) c);
                        }
-                       else if (c instanceof CollectorMapOperatorBase) {
-                               n = new 
CollectorMapNode((CollectorMapOperatorBase<?, ?, ?>) c);
+                       else if (c instanceof 
org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
+                               n = new 
CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?,
 ?, ?>) c);
                        }
                        else if (c instanceof FlatMapOperatorBase) {
                                n = new FlatMapNode((FlatMapOperatorBase<?, ?, 
?>) c);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index b2c163b..0115c31 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
@@ -179,7 +179,7 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
                }
                
                @SuppressWarnings("unchecked")
-               public <T extends PlanNode> T getNode(String name, Class<? 
extends RichFunction> stubClass) {
+               public <T extends PlanNode> T getNode(String name, Class<? 
extends Function> stubClass) {
                        List<PlanNode> nodes = this.map.get(name);
                        if (nodes == null || nodes.isEmpty()) {
                                throw new RuntimeException("No node found with 
the given name and stub class.");
@@ -241,7 +241,7 @@ public abstract class CompilerTestBase implements 
java.io.Serializable {
                }
                
                @SuppressWarnings("unchecked")
-               public <T extends Operator<?>> T getNode(String name, Class<? 
extends RichFunction> stubClass) {
+               public <T extends Operator<?>> T getNode(String name, Class<? 
extends Function> stubClass) {
                        List<Operator<?>> nodes = this.map.get(name);
                        if (nodes == null || nodes.isEmpty()) {
                                throw new RuntimeException("No node found with 
the given name and stub class.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
index 736ee14..d6ecc40 100644
--- 
a/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
+++ 
b/flink-compiler/src/test/java/org/apache/flink/compiler/util/DummyCrossStub.java
@@ -16,18 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.util;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.types.Record;
 
 public class DummyCrossStub extends CrossFunction {
        private static final long serialVersionUID = 1L;
 
-
        @Override
        public Record cross(Record first, Record second) throws Exception {
                return first;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
index 07b957d..981de14 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -24,9 +23,10 @@ import java.io.Serializable;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * An abstract stub implementation for user-defined functions. It offers 
default implementations
- * for {@link #open(Configuration)} and {@link #close()}. It also offers 
access to the
- * {@link RuntimeContext} and {@link IterationRuntimeContext}.
+ * An abstract stub implementation for rich user-defined functions.
+ * Rich functions have additional methods for initialization ({@link 
#open(Configuration)}) and
+ * teardown ({@link #close()}), as well as access to their runtime execution 
context via
+ * {@link #getRuntimeContext()}.
  */
 public abstract class AbstractRichFunction implements RichFunction, 
Serializable {
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
index 5c200af..cc8fe78 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CoGroupFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -24,18 +23,41 @@ import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
-
+/**
+ * The interface for CoGroup functions. CoGroup functions combine two data 
sets by first grouping each data set
+ * after a key and then "joining" the groups by calling this function with the 
two sets for each key. 
+ * If a key is present in only one of the two inputs, it may be that one of 
the groups is empty.
+ * <p>
+ * The basic syntax for using CoGoup on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * 
set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyCoGroupFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * <p>
+ * Some keys may only be contained in one of the two original data sets. In 
that case, the CoGroup function is invoked
+ * with in empty input for the side of the data set that did not contain 
elements with that specific key.
+ * 
+ * @param <V1> The data type of the first input data set.
+ * @param <V2> The data type of the second input data set.
+ * @param <O> The data type of the returned elements.
+ */
 public interface CoGroupFunction<V1, V2, O> extends Function, Serializable {
        
        /**
         * This method must be implemented to provide a user implementation of a
-        * coGroup. It is called for each two key-value pairs that share the 
same
-        * key and come from different inputs.
+        * coGroup. It is called for each pair of element groups where the 
elements share the
+        * same key.
+        * 
+        * @param first The records from the first input.
+        * @param second The records from the second.
+        * @param out A collector to return elements.
         * 
-        * @param first The records from the first input which were paired with 
the key.
-        * @param second The records from the second input which were paired 
with the key.
-        * @param out A collector that collects all output pairs.
+        * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
+        *                   and may trigger the recovery logic.
         */
        void coGroup(Iterator<V1> first, Iterator<V2> second, Collector<O> out) 
throws Exception;
-       
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
index d72c4c8..aed7fae 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CombineFunction.java
@@ -16,16 +16,34 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
 import java.util.Iterator;
 
 /**
- * Generic interface used for combiners.
+ * Generic interface used for combine functions ("combiners"). Combiners act 
as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the 
entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program 
efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been 
collected.
+ * <p>
+ * This special variant of the combine function reduces the group of elements 
into a single element. A variant
+ * that can return multiple values per group is defined in {@link 
FlatCombineFunction}.
+ * 
+ * @param <T> The data type processed by the combine function.
  */
 public interface CombineFunction<T> extends Function, Serializable {
 
-       T combine(Iterator<T> records) throws Exception;
+       /**
+        * The combine method, called (potentially multiple timed) with 
subgroups of elements.
+        * 
+        * @param values The elements to be combined.
+        * @return The single resulting value.
+        * 
+        * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
+        *                   and may trigger the recovery logic.
+        */
+       T combine(Iterator<T> values) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
index 0c8bc97..5e2c122 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -16,27 +16,43 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
 
-
 /**
- * @param <IN1> First input type
- * @param <IN2> Second input type
- * @param <OUT> Output type
+ * Interface for Cross functions. Cross functions are applied to the Cartesian 
produce of their inputs
+ * and call are called for each pair of elements.
+ * 
+ * They are optional, a means of convenience the can be used to directly 
produce manipulate the
+ * pair of elements, instead of processing 2-tuples that contain the pairs.
+ * <p>
+ * The basic syntax for using Cross on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.cross(set2).with(new MyCrossFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
  */
 public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
 
        /**
-        * User defined function for the cross operator.
+        * Cross UDF method. Called once per pair of elements in the Cartesian 
product of the inputs.
+        * 
+        * @param val1 Element from first input.
+        * @param val2 Element from the second input.
+        * @return The result element.
         * 
-        * @param record1 Record from first input
-        * @param record2 Record from the second input
-        * @return result of cross UDF.
-        * @throws Exception
+        * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
+        *                   and may trigger the recovery logic.
         */
-       OUT cross(IN1 record1, IN2 record2) throws Exception;
+       OUT cross(IN1 val1, IN2 val2) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
deleted file mode 100644
index 9540dc1..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/ExecutionContext.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-/**
- * The execution context provides basic information about the parallel runtime 
context
- * in which a stub instance lives. Such information includes the current 
number of
- * parallel stub instances, the stub's parallel task index, the pact name, or 
the iteration context.
- */
-public interface ExecutionContext {
-       
-       /**
-        * Gets the name of the task. This is the name supplied to the contract 
upon instantiation. If
-        * no name was given at contract instantiation time, a default name 
will be returned.
-        * 
-        * @return The task's name.
-        */
-       String getTaskName();
-       
-       /**
-        * Gets the number of parallel subtasks in which the stub is executed.
-        * 
-        * @return The number of parallel subtasks in which the stub is 
executed.
-        */
-       int getNumberOfSubtasks();
-       
-       /**
-        * Gets the subtask's parallel task number.
-        * 
-        * @return The subtask's parallel task number.
-        */
-       int getSubtaskIndex();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
index 2f68477..2380f60 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -16,18 +16,33 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
+
 import java.io.Serializable;
 
+/**
+ * base interface for Filter functions. A filter function take elements and 
evaluates a
+ * predicate on them to decide whether to keep the element, or to discard it.
+ * <p>
+ * The basic syntax for using a FilterFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.filter(new MyFilterFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> The type of the filtered elements.
+ */
 public interface FilterFunction<T> extends Function, Serializable {
        
        /**
-        * User defined function for a filter.
+        * The filter function that evaluates the predicate.
+        * 
+        * @param value The value to be filtered.
+        * @return True for values that should be retained, false for values to 
be filtered out.
         * 
-        * @param value Incoming tuples
-        * @return true for tuples that are allowed to pass the filter
-        * @throws Exception
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
         */
        boolean filter(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
index b2c8f30..edf1614 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -25,9 +24,28 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 /**
- * Generic interface used for combiners.
+ * Generic interface used for combine functions ("combiners"). Combiners act 
as auxiliaries to a {@link GroupReduceFunction}
+ * and "pre-reduce" the data. The combine functions typically do not see the 
entire group of elements, but
+ * only a sub-group.
+ * <p>
+ * Combine functions are frequently helpful in increasing the program 
efficiency, because they allow the system to
+ * reduce the data volume earlier, before the entire groups have been 
collected.
+ * <p>
+ * This special variant of the combine function supports to return more than 
one element per group.
+ * It is frequently less efficient to use than the {@link CombineFunction}.
+ * 
+ * @param <T> The data type processed by the combine function.
  */
 public interface FlatCombineFunction<T> extends Function, Serializable {
 
+       /**
+        * The combine method, called (potentially multiple timed) with 
subgroups of elements.
+        * 
+        * @param values The elements to be combined.
+        * @param out The collector to use to return values from the function.
+        * 
+        * @throws Exception The function may throw Exceptions, which will 
cause the program to cancel,
+        *                   and may trigger the recovery logic.
+        */
        void combine(Iterator<T> values, Collector<T> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
index 6a6b971..3ad2c82 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -16,15 +16,54 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
-
+/**
+ * Interface for Join functions. Joins combine two data sets by joining their
+ * elements on specified keys. This function is called with each pair of 
joining elements.
+ * <p>
+ * This particular variant of the join function supports to return zero, one, 
or more result values
+ * per pair of joining values. 
+ * <p>
+ * By default, the joins follows strictly the semantics of an "inner join" in 
SQL.
+ * the semantics are those of an "inner join", meaning that elements are 
filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * <p>
+ * The Join function is an optional part of a join operation. If no 
JoinFunction is provided,
+ * the result of the operation is a sequence of 2-tuples, where the elements 
in the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
 public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, 
Serializable {
 
-       void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception;
+       /**
+        * The join method, called once per joined pair of elements.
+        * 
+        * @param first The element from first input.
+        * @param second The element from second input.
+        * @param out The collector used to return zero, one, or more elements.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
index a8696cf..0c32eae 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -16,27 +16,36 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
-
 /**
- *
- * @param <T>
- * @param <O>
+ * interface flatMap functions. FlatMap functions take elements and transform 
them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays. Operations that produce multiple strictly one result element 
per input element can also
+ * use the {@link MapFunction}.
+ * <p>
+ * The basic syntax for using a FlatMapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
  */
 public interface FlatMapFunction<T, O> extends Function, Serializable {
 
        /**
-        * The core method of FlatMappable. Takes an element from the input 
data set and transforms
+        * The core method of the FlatMapFunction. Takes an element from the 
input data set and transforms
         * it into zero, one, or more elements.
         *
         * @param value The input value.
-        * @param out The collector for for emitting result values.
+        * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index c2a201f..9e9d88d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -24,5 +24,4 @@ package org.apache.flink.api.common.functions;
  * can be called as Java 8 lambdas
  */
 public interface Function {
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
index 41cfa1d..edac4a8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
@@ -16,13 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.util.Collector;
 
-
-
+/**
+ * Variant of the flat map that is used for backwards compatibility in the 
deprecated Record-API-
+ *
+ * @param <T> The input data type.
+ * @param <O> The result data type.
+ */
+@Deprecated
 public interface GenericCollectorMap<T, O> extends RichFunction {
        
        void map(T record, Collector<O> out) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 984d1fd..5fa959c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
@@ -24,22 +23,35 @@ import java.util.Iterator;
 
 import org.apache.flink.util.Collector;
 
-
 /**
- *
- * @param <T> Incoming types
- * @param <O> Outgoing types
+ * The interface for group reduce functions. GroupReduceFunctions process 
groups of elements.
+ * They may aggregate them to a single value, or produce multiple result 
values for each group.
+ * The group may be defined by sharing a common grouping key, or the group may 
simply be
+ * all elements of a data set.
+ * <p>
+ * For a reduce functions that works incrementally by combining always two 
elements, see 
+ * {@link ReduceFunction}.
+ * <p>
+ * The basic syntax for using a grouped GroupReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new 
MyGroupReduceFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the elements that this function processes.
+ * @param <O> The type of the elements returned by the user-defined function.
  */
 public interface GroupReduceFunction<T, O> extends Function, Serializable {
+       
        /**
+        * The reduce method. The function receives one call per group of 
elements.
         * 
-        * The central function to be implemented for a reducer. The function 
receives per call one
-        * key and all the values that belong to that key. Each key is 
guaranteed to be processed by exactly
-        * one function call across all involved instances across all computing 
nodes.
-        * 
-        * @param records All records that belong to the given input key.
+        * @param values All records that belong to the given input key.
         * @param out The collector to hand results to.
-        * @throws Exception
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
         */
        void reduce(Iterator<T> values, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
index 02f526a..9a8943c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -16,13 +16,49 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import java.io.Serializable;
 
-
+/**
+ * Interface for Join functions. Joins combine two data sets by joining their
+ * elements on specified keys. This function is called with each pair of 
joining elements.
+ * <p>
+ * By default, the joins follows strictly the semantics of an "inner join" in 
SQL.
+ * the semantics are those of an "inner join", meaning that elements are 
filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * <p>
+ * The Join function is an optional part of a join operation. If no 
JoinFunction is provided,
+ * the result of the operation is a sequence of 2-tuples, where the elements 
in the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link CoGroupFunction} to perform an outer join.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
 public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 
+       /**
+        * The join method, called once per joined pair of elements.
+        * 
+        * @param first The element from first input.
+        * @param second The element from second input.
+        * @return The resulting element.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
        OUT join(IN1 first, IN2 second) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
index 4e2520d..dccc980 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -16,16 +16,31 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
-
 import java.io.Serializable;
 
+/**
+ * Base interface for Map functions. Map functions take elements and transform 
them,
+ * element wise. A Map function always produces a single result element for 
each input element.
+ * Typical applications are parsing elements, converting data types, or 
projecting out fields.
+ * Operations that produce multiple result elements from a single input 
element can be implemented
+ * using the {@link FlatMapFunction}.
+ * <p>
+ * The basic syntax for using a MapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.map(new MyMapFunction());
+ * </blockquote></pre>
+ * 
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
 public interface MapFunction<T, O> extends Function, Serializable {
 
        /**
-        * The core method of Mappable. Takes an element from the input data 
set and transforms
+        * The mapping method. Takes an element from the input data set and 
transforms
         * it into exactly one element.
         *
         * @param value The input value.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
index 04f690a..7a61c97 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -16,16 +16,36 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
-
 import java.io.Serializable;
 
+/**
+ * Base interface for Reduce functions. Reduce functions combine groups of 
elements to
+ * a single value, by taking always two elements and combining them into one. 
Reduce functions
+ * may be used on entire data sets, or on grouped data sets. In the latter 
case, each group is reduced
+ * individually.
+ * <p>
+ * For a reduce functions that work on an entire group at the same time (such 
as the 
+ * MapReduce/Hadoop-style reduce), see {@link GroupReduceFunction}. In the 
general case,
+ * ReduceFunctions are considered faster, because they allow the system to use 
more efficient
+ * execution strategies.
+ * <p>
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.groupBy(<key-definition>).reduce(new 
MyReduceFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the ReduceFunction needs to be serializable, as defined 
in {@link java.io.Serializable}.
+ * 
+ * @param <T> Type of the elements that this function processes.
+ */
 public interface ReduceFunction<T> extends Function, Serializable {
 
        /**
-        * The core method of Reducible, combining two values into one value of 
the same type.
+        * The core method of ReduceFunction, combining two values into one 
value of the same type.
         * The reduce function is consecutively applied to all values of a 
group until only a single value remains.
         *
         * @param value1 The first value to combine.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index ffc3ac2..8f53252 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
 import org.apache.flink.configuration.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 3cf30ff..e18858b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -105,34 +105,6 @@ public interface RuntimeContext {
         * Convenience function to create a counter object for histograms.
         */
        Histogram getHistogram(String name);
-
-//     /**
-//      * I propose to remove this and only keep the other more explicit 
functions
-//      * (to add or get an accumulator object)
-//      * 
-//      * Get an existing or new named accumulator object. Use this function 
to get
-//      * an counter for an custom accumulator type. For the integrated
-//      * accumulators you better use convenience functions (e.g. 
getIntCounter).
-//      * 
-//      * There is no need to register accumulators - they will be created 
when a
-//      * UDF asks the first time for a counter that does not exist yet 
locally.
-//      * This implies that there can be conflicts when a counter is requested 
with
-//      * the same name but with different types, either in the same UDF or in
-//      * different. In the last case the conflict occurs during merging.
-//      * 
-//      * @param name
-//      * @param accumulatorClass
-//      *            If the accumulator was not created previously
-//      * @return
-//      */
-//     <V, A> Accumulator<V, A> getAccumulator(String name,
-//                     Class<? extends Accumulator<V, A>> accumulatorClass);
-//
-//     /**
-//      * See getAccumulable
-//      */
-//     <T> SimpleAccumulator<T> getSimpleAccumulator(String name,
-//                     Class<? extends SimpleAccumulator<T>> accumulatorClass);
        
        // 
--------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index bc4ffd0..6455c77 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -18,18 +18,15 @@
 
 package org.apache.flink.api.common.functions.util;
 
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 
-import java.lang.invoke.SerializedLambda;
 import java.lang.reflect.Method;
 
 public class FunctionUtils {
 
-
        public static void openFunction (Function function, Configuration 
parameters) throws Exception{
                if (function instanceof RichFunction) {
                        RichFunction richFunction = (RichFunction) function;
@@ -61,21 +58,30 @@ public class FunctionUtils {
                }
        }
 
-       public static boolean isSerializedLambdaFunction(Function function) {
-               Class<?> clazz = function.getClass();
-               try {
-                       Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
-                       replaceMethod.setAccessible(true);
-                       Object serializedForm = replaceMethod.invoke(function);
-                       if (serializedForm instanceof SerializedLambda) {
-                               return true;
+       public static boolean isLambdaFunction(Function function) {
+               if (function == null) {
+                       throw new IllegalArgumentException();
+               }
+               
+               for (Class<?> clazz = function.getClass(); clazz != null; clazz 
= clazz.getSuperclass()) {
+                       try {
+                               Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
+                               replaceMethod.setAccessible(true);
+                               Object serialVersion = 
replaceMethod.invoke(function);
+                               
+                               if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
+                                       return true;
+                               }
                        }
-                       else {
-                               return false;
+                       catch (NoSuchMethodException e) {
+                               // thrown if the method is not there. fall 
through the loop
+                       }
+                       catch (Throwable t) {
+                               // this should not happen, we are not executing 
any method code.
+                               throw new RuntimeException("Error while 
checking whether function is a lambda.", t);
                        }
                }
-               catch (Exception e) {
-                       return false;
-               }
+               
+               return false;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index c416765..6761c17 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 66bea7f..c91e6ab 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -48,6 +48,7 @@ import org.apache.flink.util.Visitor;
 /**
  * 
  */
+@SuppressWarnings("deprecation")
 public class BulkIterationBase<T> extends SingleInputOperator<T, T, 
AbstractRichFunction> implements IterationOperator {
        
        private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index ef00a46..6909dd4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.functions.GenericCollectorMap;
@@ -33,6 +32,7 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * 
  * @see GenericCollectorMap
  */
+@Deprecated
 public class CollectorMapOperatorBase<IN, OUT, FT extends 
GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
        
        public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, 
UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
index 7d6495b..2683583 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase;
 /**
  * Convenience methods when dealing with {@link Operator}s.
  */
+@SuppressWarnings("deprecation")
 public class OperatorUtil {
        
        @SuppressWarnings("rawtypes")
@@ -126,7 +127,7 @@ public class OperatorUtil {
         * @param inputs
         *        all input contracts to this contract
         */
-       @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
+       @SuppressWarnings({ "rawtypes", "unchecked" })
        public static void setInputs(final Operator<?> contract, final 
List<List<Operator>> inputs) {
                if (contract instanceof GenericDataSinkBase) {
                        if (inputs.size() != 1) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
index 30091ab..88809bb 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 /**
  * Tests {@link OperatorUtil}.
  */
+@SuppressWarnings("deprecation")
 public class OperatorUtilTest {
        /**
         * Test {@link OperatorUtil#getContractClass(Class)}
@@ -115,13 +116,17 @@ public class OperatorUtilTest {
                assertEquals(GenericDataSourceBase.class, result);
        }
 
+       @SuppressWarnings("serial")
        static abstract class CoGrouper implements CoGroupFunction<IntValue, 
IntValue, IntValue> {}
 
+       @SuppressWarnings("serial")
        static abstract class Crosser implements CrossFunction<IntValue, 
IntValue, IntValue> {}
 
        static abstract class Mapper implements GenericCollectorMap<IntValue, 
IntValue> {}
 
+       @SuppressWarnings("serial")
        static abstract class Matcher implements FlatJoinFunction<IntValue, 
IntValue, IntValue> {}
 
+       @SuppressWarnings("serial")
        static abstract class Reducer implements GroupReduceFunction<IntValue, 
IntValue> {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e7199f9..3d1238a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -50,9 +50,9 @@ import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.api.java.operators.ProjectOperator.Projection;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.operators.SortedGrouping;
 import org.apache.flink.api.java.operators.UnionOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.record.functions.CrossFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -65,8 +65,8 @@ import org.apache.flink.types.TypeInformation;
  * A DataSet represents a collection of elements of the same type.<br/>
  * A DataSet can be transformed into another DataSet by applying a 
transformation as for example 
  * <ul>
- *   <li>{@link 
DataSet#map(org.apache.flink.api.java.functions.RichMapFunction)},</li>
- *   <li>{@link 
DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
+ *   <li>{@link 
DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li>
+ *   <li>{@link 
DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>
  *   <li>{@link DataSet#join(DataSet)}, or</li>
  *   <li>{@link DataSet#coGroup(DataSet)}.</li>
  * </ul>
@@ -135,7 +135,7 @@ public abstract class DataSet<T> {
                if (mapper == null) {
                        throw new NullPointerException("Map function must not 
be null.");
                }
-               if (FunctionUtils.isSerializedLambdaFunction(mapper)) {
+               if (FunctionUtils.isLambdaFunction(mapper)) {
                        throw new UnsupportedLambdaExpressionException();
                }
                return new MapOperator<T, R>(this, mapper);
@@ -157,7 +157,7 @@ public abstract class DataSet<T> {
                if (flatMapper == null) {
                        throw new NullPointerException("FlatMap function must 
not be null.");
                }
-               if (FunctionUtils.isSerializedLambdaFunction(flatMapper)) {
+               if (FunctionUtils.isLambdaFunction(flatMapper)) {
                        throw new UnsupportedLambdaExpressionException();
                }
                return new FlatMapOperator<T, R>(this, flatMapper);
@@ -197,13 +197,13 @@ public abstract class DataSet<T> {
         * 
         * @param fieldIndexes The field indexes of the input tuples that are 
retained.
         *                                         The order of fields in the 
output tuple corresponds to the order of field indexes.
-        * @return A Projection that needs to be converted into a {@link 
ProjectOperator} to complete the 
+        * @return A Projection that needs to be converted into a {@link 
org.apache.flink.api.java.operators.ProjectOperator} to complete the 
         *           Project transformation by calling {@link 
Projection#types()}.
         * 
         * @see Tuple
         * @see DataSet
         * @see Projection
-        * @see ProjectOperator
+        * @see org.apache.flink.api.java.operators.ProjectOperator
         */
        public Projection<T> project(int... fieldIndexes) {
                return new Projection<T>(this, fieldIndexes);
@@ -303,7 +303,7 @@ public abstract class DataSet<T> {
                if (reducer == null) {
                        throw new NullPointerException("GroupReduce function 
must not be null.");
                }
-               if (FunctionUtils.isSerializedLambdaFunction(reducer)) {
+               if (FunctionUtils.isLambdaFunction(reducer)) {
                        throw new UnsupportedLambdaExpressionException();
                }
                return new GroupReduceOperator<T, R>(this, reducer);
@@ -366,17 +366,15 @@ public abstract class DataSet<T> {
         * <ul>
         *   <li>{@link UnsortedGrouping#sortGroup(int, 
org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
         *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply 
an Aggregate transformation.
-        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)}
 to apply a Reduce transformation.
-        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}
 to apply a GroupReduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} 
to apply a Reduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}
 to apply a GroupReduce transformation.
         * </ul>
         *  
         * @param keyExtractor The KeySelector function which extracts the key 
values from the DataSet on which it is grouped. 
         * @return An UnsortedGrouping on which a transformation needs to be 
applied to obtain a transformed DataSet.
         * 
         * @see KeySelector
-        * @see Grouping
         * @see UnsortedGrouping
-        * @see SortedGrouping
         * @see AggregateOperator
         * @see ReduceOperator
         * @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -395,17 +393,15 @@ public abstract class DataSet<T> {
         * <ul>
         *   <li>{@link UnsortedGrouping#sortGroup(int, 
org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}. 
         *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply 
an Aggregate transformation.
-        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)}
 to apply a Reduce transformation.
-        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}
 to apply a GroupReduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} 
to apply a Reduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}
 to apply a GroupReduce transformation.
         * </ul> 
         * 
         * @param fields One or more field positions on which the DataSet will 
be grouped. 
         * @return A Grouping on which a transformation needs to be applied to 
obtain a transformed DataSet.
         * 
         * @see Tuple
-        * @see Grouping
         * @see UnsortedGrouping
-        * @see SortedGrouping
         * @see AggregateOperator
         * @see ReduceOperator
         * @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -424,17 +420,15 @@ public abstract class DataSet<T> {
         * <ul>
         *   <li>{@link UnsortedGrouping#sortGroup(int, 
org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.
         *   <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply 
an Aggregate transformation.
-        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)}
 to apply a Reduce transformation.
-        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)}
 to apply a GroupReduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} 
to apply a Reduce transformation.
+        *   <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)}
 to apply a GroupReduce transformation.
         * </ul>
         *
         * @param fields One or more field expressions on which the DataSet 
will be grouped.
         * @return A Grouping on which a transformation needs to be applied to 
obtain a transformed DataSet.
         *
         * @see Tuple
-        * @see Grouping
         * @see UnsortedGrouping
-        * @see SortedGrouping
         * @see AggregateOperator
         * @see ReduceOperator
         * @see org.apache.flink.api.java.operators.GroupReduceOperator
@@ -462,7 +456,6 @@ public abstract class DataSet<T> {
         * @return A JoinOperatorSets to continue the definition of the Join 
transformation.
         * 
         * @see JoinOperatorSets
-        * @see JoinOperator
         * @see DataSet
         */
        public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
@@ -484,7 +477,6 @@ public abstract class DataSet<T> {
         * @return A JoinOperatorSets to continue the definition of the Join 
transformation.
         * 
         * @see JoinOperatorSets
-        * @see JoinOperator
         * @see DataSet
         */
        public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
@@ -506,7 +498,6 @@ public abstract class DataSet<T> {
         * @return A JoinOperatorSet to continue the definition of the Join 
transformation.
         * 
         * @see JoinOperatorSets
-        * @see JoinOperator
         * @see DataSet
         */
        public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
@@ -570,7 +561,7 @@ public abstract class DataSet<T> {
         * second input being the second field of the tuple.
         * 
         * <p>
-        * Call {@link DefaultCross.with(CrossFunction)} to define a {@link 
CrossFunction} which is called for
+        * Call {@link 
org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(CrossFunction)}
 to define a {@link CrossFunction} which is called for
         * each pair of crossed elements. The CrossFunction returns a exactly 
one element for each pair of input elements.</br>
         * 
         * @param other The other DataSet with which this DataSet is crossed. 
@@ -599,14 +590,15 @@ public abstract class DataSet<T> {
         * second input being the second field of the tuple.
         *   
         * <p>
-        * Call {@link DefaultCross.with(CrossFunction)} to define a {@link 
CrossFunction} which is called for
+        * Call {@link 
DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to 
define a
+        * {@link org.apache.flink.api.common.functions.CrossFunction} which is 
called for
         * each pair of crossed elements. The CrossFunction returns a exactly 
one element for each pair of input elements.</br>
         * 
         * @param other The other DataSet with which this DataSet is crossed. 
         * @return A DefaultCross that returns a Tuple2 for each pair of 
crossed elements.
         * 
         * @see DefaultCross
-        * @see CrossFunction
+        * @see org.apache.flink.api.common.functions.CrossFunction
         * @see DataSet
         * @see Tuple2
         */
@@ -628,14 +620,15 @@ public abstract class DataSet<T> {
         * second input being the second field of the tuple.
         *   
         * <p>
-        * Call {@link DefaultCross.with(CrossFunction)} to define a {@link 
CrossFunction} which is called for
+        * Call {@link 
DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to 
define a
+        * {@link org.apache.flink.api.common.functions.CrossFunction} which is 
called for
         * each pair of crossed elements. The CrossFunction returns a exactly 
one element for each pair of input elements.</br>
         * 
         * @param other The other DataSet with which this DataSet is crossed. 
         * @return A DefaultCross that returns a Tuple2 for each pair of 
crossed elements.
         * 
         * @see DefaultCross
-        * @see CrossFunction
+        * @see org.apache.flink.api.common.functions.CrossFunction
         * @see DataSet
         * @see Tuple2
         */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index ebd1422..af0ea52 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -112,7 +112,7 @@ public abstract class ExecutionEnvironment {
         * individually override this value to use a specific degree of 
parallelism via
         * {@link Operator#setParallelism(int)}. Other operations may need to 
run with a different
         * degree of parallelism - for example calling
-        * {@link 
DataSet#reduce(org.apache.flink.api.java.functions.RichReduceFunction)} over 
the entire
+        * {@link 
DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the 
entire
         * set will insert eventually an operation that runs non-parallel 
(degree of parallelism of one).
         * 
         * @return The degree of parallelism used by operations, unless they 
override that value. This method

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 012ab57..0a83235 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -23,7 +23,7 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 /**
- * A variant of the {@link RichFlatMapFunction} that returns elements through 
an iterator, rather then
+ * A convenience variant of the {@link RichFlatMapFunction} that returns 
elements through an iterator, rather then
  * through a collector. In all other respects, it behaves exactly like the 
FlatMapFunction.
  * <p>
  * The function needs to be serializable, as defined in {@link 
java.io.Serializable}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
index 8aaaf86..e78c31e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCoGroupFunction.java
@@ -22,29 +22,14 @@ import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * The abstract base class for CoGroup functions. CoGroup functions combine 
two data sets by first grouping each data set
- * after a key and then "joining" the groups by calling this function with the 
two sets for each key. 
- * If a key is present in only one of the two inputs, it may be that one of 
the groups is empty.
- * <p>
- * The basic syntax for using CoGoup on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * 
set1.coGroup(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyCoGroupFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second 
input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * Some keys may only be contained in one of the two original data sets. In 
that case, the CoGroup function is invoked
- * with in empty input for the side of the data set that did not contain 
elements with that specific key.
- * <p>
- * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
+ * Rich variant of the {@link CoGroupFunction}. As a {@link RichFunction}, it 
gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
@@ -54,20 +39,6 @@ public abstract class RichCoGroupFunction<IN1, IN2, OUT> 
extends AbstractRichFun
 
        private static final long serialVersionUID = 1L;
        
-       
-       /**
-        * The core method of the CoGroupFunction. This method is called for 
each pair of groups that have the same
-        * key. The elements of the groups are returned by the respective 
iterators.
-        * 
-        * It is possible that one of the two groups is empty, in which case 
the respective iterator has no elements.
-        * 
-        * @param first The group from the first input.
-        * @param second The group from the second input.
-        * @param out The collector through which to return the result elements.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
        @Override
        public abstract void coGroup(Iterator<IN1> first, Iterator<IN2> second, 
Collector<OUT> out) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
index a4e1248..58be279 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
@@ -20,26 +20,13 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
-
+import org.apache.flink.api.common.functions.RichFunction;
 
 /**
- * The abstract base class for Cross functions. Cross functions build a 
Cartesian produce of their inputs
- * and call the function or each pair of elements.
- * They are a means of convenience and can be used to directly produce 
manipulate the
- * pair of elements, instead of having the operator build 2-tuples, and then 
using a
- * MapFunction over those 2-tuples.
- * <p>
- * The basic syntax for using Cross on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.cross(set2).with(new MyCrossFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second 
input.
- * <p>
- * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
+ * Rich variant of the {@link CrossFunction}. As a {@link RichFunction}, it 
gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
@@ -48,19 +35,7 @@ import org.apache.flink.api.common.functions.CrossFunction;
 public abstract class RichCrossFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
        
        private static final long serialVersionUID = 1L;
-       
 
-       /**
-        * The core method of the cross operation. The method will be invoked 
for each pair of elements
-        * in the Cartesian product.
-        * 
-        * @param first The element from the first input.
-        * @param second The element from the second input.
-        * @return The result element.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
        @Override
        public abstract OUT cross(IN1 first, IN2 second) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
index e3baa74..9057a0f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
@@ -20,19 +20,13 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 
 /**
- * The abstract base class for Filter functions. A filter function take 
elements and evaluates a
- * predicate on them to decide whether to keep the element, or to discard it.
- * <p>
- * The basic syntax for using a FilterFunction is as follows:
- * <pre><blockquote>
- * DataSet<X> input = ...;
- * 
- * DataSet<X> result = input.filter(new MyFilterFunction());
- * </blockquote></pre>
- * <p>
- * Like all functions, the FilterFunction needs to be serializable, as defined 
in {@link java.io.Serializable}.
+ * Rich variant of the {@link FilterFunction}. As a {@link RichFunction}, it 
gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <T> The type of the filtered elements.
  */
@@ -40,18 +34,6 @@ public abstract class RichFilterFunction<T> extends 
AbstractRichFunction impleme
        
        private static final long serialVersionUID = 1L;
        
-       /**
-        * The core method of the FilterFunction. The method is called for each 
element in the input,
-        * and determines whether the element should be kept or filtered out. 
If the method returns true,
-        * the element passes the filter and is kept, if the method returns 
false, the element is
-        * filtered out.
-        * 
-        * @param value The input value to be filtered.
-        * @return Flag to indicate whether to keep the value (true) or to 
discard it (false).
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
        @Override
        public abstract boolean filter(T value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
index 8c326c6..97c15ff 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -21,10 +21,19 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 import java.util.Iterator;
 
+/**
+ * Rich variant of the {@link FlatCombineFunction}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <T> The data type of the elements to be combined.
+ */
 public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction 
implements FlatCombineFunction<T> {
 
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bc89e911/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
index 15b4539..6918364 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
@@ -20,36 +20,14 @@ package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * The abstract base class for Join functions. Join functions combine two data 
sets by joining their
- * elements on specified keys and calling this function with each pair of 
joining elements.
- * By default, this follows strictly the semantics of an "inner join" in SQL.
- * the semantics are those of an "inner join", meaning that elements are 
filtered out
- * if their key is not contained in the other data set.
- * <p>
- * Per the semantics of an inner join, the function is 
- * <p>
- * The basic syntax for using Join on two data sets is as follows:
- * <pre><blockquote>
- * DataSet<X> set1 = ...;
- * DataSet<Y> set2 = ...;
- * 
- * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyJoinFunction());
- * </blockquote></pre>
- * <p>
- * {@code set1} is here considered the first input, {@code set2} the second 
input.
- * The keys can be defined through tuple field positions or key extractors.
- * See {@link org.apache.flink.api.java.operators.Keys} for details.
- * <p>
- * The Join function is actually not a necessary part of a join operation. If 
no JoinFunction is provided,
- * the result of the operation is a sequence of Tuple2, where the elements in 
the tuple are those that
- * the JoinFunction would have been invoked with.
- * <P>
- * Note: You can use a {@link RichCoGroupFunction} to perform an outer join.
- * <p>
- * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
+ * Rich variant of the {@link FlatJoinFunction}. As a {@link RichFunction}, it 
gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
  * 
  * @param <IN1> The type of the elements in the first input.
  * @param <IN2> The type of the elements in the second input.
@@ -59,17 +37,6 @@ public abstract class RichFlatJoinFunction<IN1, IN2, OUT> 
extends AbstractRichFu
 
        private static final long serialVersionUID = 1L;
 
-       /**
-        * The user-defined method for performing transformations after a join.
-        * The method is called with matching pairs of elements from the inputs.
-        * 
-        * @param first The element from first input.
-        * @param second The element from second input.
-        * @return The resulting element.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
        @Override
        public abstract void join(IN1 first, IN2 second, Collector<OUT> out) 
throws Exception;
 }

Reply via email to