http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
new file mode 100644
index 0000000..a4e1248
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichCrossFunction.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+
+
+/**
+ * The abstract base class for Cross functions. Cross functions build a 
Cartesian produce of their inputs
+ * and call the function or each pair of elements.
+ * They are a means of convenience and can be used to directly produce 
manipulate the
+ * pair of elements, instead of having the operator build 2-tuples, and then 
using a
+ * MapFunction over those 2-tuples.
+ * <p>
+ * The basic syntax for using Cross on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.cross(set2).with(new MyCrossFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * <p>
+ * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichCrossFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {
+       
+       private static final long serialVersionUID = 1L;
+       
+
+       /**
+        * The core method of the cross operation. The method will be invoked 
for each pair of elements
+        * in the Cartesian product.
+        * 
+        * @param first The element from the first input.
+        * @param second The element from the second input.
+        * @return The result element.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract OUT cross(IN1 first, IN2 second) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
new file mode 100644
index 0000000..e3baa74
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFilterFunction.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * The abstract base class for Filter functions. A filter function take 
elements and evaluates a
+ * predicate on them to decide whether to keep the element, or to discard it.
+ * <p>
+ * The basic syntax for using a FilterFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.filter(new MyFilterFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the FilterFunction needs to be serializable, as defined 
in {@link java.io.Serializable}.
+ * 
+ * @param <T> The type of the filtered elements.
+ */
+public abstract class RichFilterFunction<T> extends AbstractRichFunction 
implements FilterFunction<T> {
+       
+       private static final long serialVersionUID = 1L;
+       
+       /**
+        * The core method of the FilterFunction. The method is called for each 
element in the input,
+        * and determines whether the element should be kept or filtered out. 
If the method returns true,
+        * the element passes the filter and is kept, if the method returns 
false, the element is
+        * filtered out.
+        * 
+        * @param value The input value to be filtered.
+        * @return Flag to indicate whether to keep the value (true) or to 
discard it (false).
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract boolean filter(T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
new file mode 100644
index 0000000..8c326c6
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatCombineFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.Iterator;
+
+public abstract class RichFlatCombineFunction<T> extends AbstractRichFunction 
implements FlatCombineFunction<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public abstract void combine(Iterator<T> values, Collector<T> out) 
throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
new file mode 100644
index 0000000..15b4539
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatJoinFunction.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The abstract base class for Join functions. Join functions combine two data 
sets by joining their
+ * elements on specified keys and calling this function with each pair of 
joining elements.
+ * By default, this follows strictly the semantics of an "inner join" in SQL.
+ * the semantics are those of an "inner join", meaning that elements are 
filtered out
+ * if their key is not contained in the other data set.
+ * <p>
+ * Per the semantics of an inner join, the function is 
+ * <p>
+ * The basic syntax for using Join on two data sets is as follows:
+ * <pre><blockquote>
+ * DataSet<X> set1 = ...;
+ * DataSet<Y> set2 = ...;
+ * 
+ * set1.join(set2).where(<key-definition>).equalTo(<key-definition>).with(new 
MyJoinFunction());
+ * </blockquote></pre>
+ * <p>
+ * {@code set1} is here considered the first input, {@code set2} the second 
input.
+ * The keys can be defined through tuple field positions or key extractors.
+ * See {@link org.apache.flink.api.java.operators.Keys} for details.
+ * <p>
+ * The Join function is actually not a necessary part of a join operation. If 
no JoinFunction is provided,
+ * the result of the operation is a sequence of Tuple2, where the elements in 
the tuple are those that
+ * the JoinFunction would have been invoked with.
+ * <P>
+ * Note: You can use a {@link RichCoGroupFunction} to perform an outer join.
+ * <p>
+ * All functions need to be serializable, as defined in {@link 
java.io.Serializable}.
+ * 
+ * @param <IN1> The type of the elements in the first input.
+ * @param <IN2> The type of the elements in the second input.
+ * @param <OUT> The type of the result elements.
+ */
+public abstract class RichFlatJoinFunction<IN1, IN2, OUT> extends 
AbstractRichFunction implements FlatJoinFunction<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The user-defined method for performing transformations after a join.
+        * The method is called with matching pairs of elements from the inputs.
+        * 
+        * @param first The element from first input.
+        * @param second The element from second input.
+        * @return The resulting element.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract void join(IN1 first, IN2 second, Collector<OUT> out) 
throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
new file mode 100644
index 0000000..2293b5e
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichFlatMapFunction.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The abstract base class for flatMap functions. FlatMap functions take 
elements and transform them,
+ * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
+ * and arrays. Operations that produce multiple strictly one result element 
per input element can also
+ * use the {@link RichMapFunction}.
+ * <p>
+ * The basic syntax for using a FlatMapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.flatMap(new MyFlatMapFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the FlatMapFunction needs to be serializable, as 
defined in {@link java.io.Serializable}.
+ * 
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+public abstract class RichFlatMapFunction<IN, OUT> extends 
AbstractRichFunction implements FlatMapFunction<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The core method of the FlatMapFunction. Takes an element from the 
input data set and transforms
+        * it into zero, one, or more elements.
+        * 
+        * @param value The input value.
+        * @param out The collector for for emitting result values.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract void flatMap(IN value, Collector<OUT> out) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
new file mode 100644
index 0000000..eb75f53
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichGroupReduceFunction.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The abstract base class for group reduce functions. Group reduce functions 
process groups of elements.
+ * They may aggregate them to a single value, or produce multiple result 
values for each group.
+ * <p>
+ * For a reduce functions that works incrementally by combining always two 
elements, see 
+ * {@link RichReduceFunction}, called via {@link 
org.apache.flink.api.java.DataSet#reduce(RichReduceFunction)}.
+ * <p>
+ * The basic syntax for using a grouped GroupReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new 
MyGroupReduceFunction());
+ * </blockquote></pre>
+ * <p>
+ * GroupReduceFunctions may be "combinable", in which case they can pre-reduce 
partial groups in order to
+ * reduce the data volume early. See the {@link #combine(Iterator, Collector)} 
function for details.
+ * <p>
+ * Like all functions, the GroupReduceFunction needs to be serializable, as 
defined in {@link java.io.Serializable}.
+ * 
+ * @param <IN> Type of the elements that this function processes.
+ * @param <OUT> The type of the elements returned by the user-defined function.
+ */
+public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT>, 
FlatCombineFunction<IN> {
+       
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Core method of the reduce function. It is called one per group of 
elements. If the reducer
+        * is not grouped, than the entire data set is considered one group.
+        * 
+        * @param values The iterator returning the group of values to be 
reduced.
+        * @param out The collector to emit the returned values.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract void reduce(Iterator<IN> values, Collector<OUT> out) 
throws Exception;
+       
+       /**
+        * The combine methods pre-reduces elements. It may be called on 
subsets of the data
+        * before the actual reduce function. This is often helpful to lower 
data volume prior
+        * to reorganizing the data in an expensive way, as might be required 
for the final
+        * reduce function.
+        * <p>
+        * This method is only ever invoked when the subclass of {@link 
RichGroupReduceFunction}
+        * adds the {@link Combinable} annotation, or if the <i>combinable</i> 
flag is set when defining
+        * the <i>reduceGroup<i> operation via
+        * {@link 
org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean)}.
+        * <p>
+        * Since the reduce function will be called on the result of this 
method, it is important that this
+        * method returns the same data type as it consumes. By default, this 
method only calls the
+        * {@link #reduce(Iterator, Collector)} method. If the behavior in the 
pre-reducing is different
+        * from the final reduce function (for example because the reduce 
function changes the data type),
+        * this method must be overwritten, or the execution will fail.
+        * 
+        * @param values The iterator returning the group of values to be 
reduced.
+        * @param out The collector to emit the returned values.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public void combine(Iterator<IN> values, Collector<IN> out) throws 
Exception {
+               @SuppressWarnings("unchecked")
+               Collector<OUT> c = (Collector<OUT>) out;
+               reduce(values, c);
+       }
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * This annotation can be added to classes that extend {@link 
RichGroupReduceFunction}, in oder to mark
+        * them as "combinable". The system may call the {@link 
RichGroupReduceFunction#combine(Iterator, Collector)}
+        * method on such functions, to pre-reduce the data before transferring 
it over the network to
+        * the actual group reduce operation.
+        * <p>
+        * Marking combinable functions as such is in general beneficial for 
performance.
+        */
+       @Retention(RetentionPolicy.RUNTIME)
+       @Target(ElementType.TYPE)
+       public static @interface Combinable {};
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
new file mode 100644
index 0000000..7eaf44c
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichJoinFunction.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+
+public abstract class RichJoinFunction<IN1,IN2,OUT> extends 
AbstractRichFunction implements JoinFunction<IN1,IN2,OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public abstract OUT join(IN1 first, IN2 second) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
new file mode 100644
index 0000000..54de7d4
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichMapFunction.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * The abstract base class for Map functions. Map functions take elements and 
transform them,
+ * element wise. A Map function always produces a single result element for 
each input element.
+ * Typical applications are parsing elements, converting data types, or 
projecting out fields.
+ * Operations that produce multiple result elements from a single input 
element can be implemented
+ * using the {@link RichFlatMapFunction}.
+ * <p>
+ * The basic syntax for using a MapFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<Y> result = input.map(new MyMapFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the MapFunction needs to be serializable, as defined in 
{@link java.io.Serializable}.
+ * 
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the returned elements.
+ */
+public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction 
implements MapFunction<IN, OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The core method of the MapFunction. Takes an element from the input 
data set and transforms
+        * it into another element.
+        * 
+        * @param value The input value.
+        * @return The value produced by the map function from the input value.
+        * 
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       @Override
+       public abstract OUT map(IN value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
new file mode 100644
index 0000000..35cb392
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/RichReduceFunction.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+
+/**
+ * The abstract base class for Reduce functions. Reduce functions combine 
groups of elements to
+ * a single value, by taking always two elements and combining them into one. 
Reduce functions
+ * may be used on entire data sets, or on grouped data sets. In the latter 
case, each group is reduced
+ * individually.
+ * <p>
+ * For a reduce functions that work on an entire group at the same time (such 
as the 
+ * MapReduce/Hadoop-style reduce), see {@link RichGroupReduceFunction}, called 
via
+ * {@link 
org.apache.flink.api.java.DataSet#reduceGroup(RichGroupReduceFunction)}. In the 
general case,
+ * ReduceFunctions are considered faster, because they allow the system to use 
hash-based
+ * execution strategies.
+ * <p>
+ * The basic syntax for using a grouped ReduceFunction is as follows:
+ * <pre><blockquote>
+ * DataSet<X> input = ...;
+ * 
+ * DataSet<X> result = input.groupBy(<key-definition>).reduce(new 
MyReduceFunction());
+ * </blockquote></pre>
+ * <p>
+ * Like all functions, the ReduceFunction needs to be serializable, as defined 
in {@link java.io.Serializable}.
+ * 
+ * @param <T> Type of the elements that this function processes.
+ */
+public abstract class RichReduceFunction<T> extends AbstractRichFunction 
implements ReduceFunction<T> {
+       
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * The core method of the ReduceFunction, combining two values into one 
value of the same type.
+        * The reduce function is consecutively applied to all values of a 
group until only a single value remains.
+        *
+        * @param value1 The first value to combine.
+        * @param value2 The second value to combine.
+        * @return The combined value of both input values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       public abstract T reduce(T value1, T value2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
new file mode 100644
index 0000000..ccc4685
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/UnsupportedLambdaExpressionException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.InvalidProgramException;
+
+public class UnsupportedLambdaExpressionException extends 
InvalidProgramException {
+
+       private static final long serialVersionUID = -1721898801986321010L;
+
+       public UnsupportedLambdaExpressionException() {
+               super("Java 8 lambda expressions are currently supported only 
in filter and reduce user-defined functions.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index ca6ed94..80a5fa0 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -32,8 +32,8 @@ import 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.java.aggregation.AggregationFunction;
 import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction.Combinable;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
@@ -151,7 +151,7 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
 
        @SuppressWarnings("unchecked")
        @Override
-       protected 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, 
GenericGroupReduce<IN, IN>> translateToDataFlow(Operator<IN> input) {
+       protected 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>> translateToDataFlow(Operator<IN> input) {
                
                // sanity check
                if (this.aggregationFunctions.isEmpty() || 
this.aggregationFunctions.size() != this.fields.size()) {
@@ -174,7 +174,7 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                
                
                @SuppressWarnings("rawtypes")
-               GroupReduceFunction<IN, IN> function = new 
AggregatingUdf(aggFunctions, fields);
+               RichGroupReduceFunction<IN, IN> function = new 
AggregatingUdf(aggFunctions, fields);
                
                
                String name = getName() != null ? getName() : 
genName.toString();
@@ -183,8 +183,8 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                if (this.grouping == null) {
                        // non grouped aggregation
                        UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, 
IN>> po =
-                                       new GroupReduceOperatorBase<IN, IN, 
GenericGroupReduce<IN, IN>>(function, operatorInfo, new int[0], name);
+                       GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, 
IN>> po =
+                                       new GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
                        
                        po.setCombinable(true);
                        
@@ -200,8 +200,8 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                        // grouped aggregation
                        int[] logicalKeyPositions = 
this.grouping.getKeys().computeLogicalKeyPositions();
                        UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<IN, IN, GenericGroupReduce<IN, 
IN>> po =
-                                       new GroupReduceOperatorBase<IN, IN, 
GenericGroupReduce<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
+                       GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, 
IN>> po =
+                                       new GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
                        
                        po.setCombinable(true);
                        
@@ -245,7 +245,7 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
        // 
--------------------------------------------------------------------------------------------
        
        @Combinable
-       public static final class AggregatingUdf<T extends Tuple> extends 
GroupReduceFunction<T, T> {
+       public static final class AggregatingUdf<T extends Tuple> extends 
RichGroupReduceFunction<T, T> {
                private static final long serialVersionUID = 1L;
                
                private final int[] fieldPositions;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 8748556..89c3334 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -21,8 +21,9 @@ package org.apache.flink.api.java.operators;
 import java.security.InvalidParameterException;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -30,8 +31,8 @@ import 
org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.java.functions.CoGroupFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.operators.Keys.FieldPositionKeys;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingCoGroupOperator;
@@ -123,8 +124,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                        int[] logicalKeyPositions1 = 
keys1.computeLogicalKeyPositions();
                        int[] logicalKeyPositions2 = 
keys2.computeLogicalKeyPositions();
                        
-                       CoGroupOperatorBase<I1, I2, OUT, GenericCoGrouper<I1, 
I2, OUT>> po =
-                                       new CoGroupOperatorBase<I1, I2, OUT, 
GenericCoGrouper<I1, I2, OUT>>(
+                       CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, 
I2, OUT>> po =
+                                       new CoGroupOperatorBase<I1, I2, OUT, 
CoGroupFunction<I1, I2, OUT>>(
                                                        function, new 
BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), 
getResultType()),
                                                        logicalKeyPositions1, 
logicalKeyPositions2, name);
                        
@@ -199,10 +200,10 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                final KeyExtractingMapper<I1, K> extractor1 = new 
KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
                final KeyExtractingMapper<I2, K> extractor2 = new 
KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
                
-               final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, 
Tuple2<K, I1>>> keyMapper1 =
-                               new MapOperatorBase<I1, Tuple2<K, I1>, 
GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-               final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, 
Tuple2<K, I2>>> keyMapper2 =
-                               new MapOperatorBase<I2, Tuple2<K, I2>, 
GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+               final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, 
Tuple2<K, I1>>> keyMapper1 =
+                               new MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+               final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, 
Tuple2<K, I2>>> keyMapper2 =
+                               new MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
                final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = 
new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, keys2, name, 
outputType, typeInfoWithKey1, typeInfoWithKey2);
 
                cogroup.setFirstInput(keyMapper1);
@@ -236,10 +237,10 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                final TupleKeyExtractingMapper<I1, K> extractor1 = new 
TupleKeyExtractingMapper<I1, K>(logicalKeyPositions1[0]);
                final KeyExtractingMapper<I2, K> extractor2 = new 
KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
 
-               final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, 
Tuple2<K, I1>>> keyMapper1 =
-                               new MapOperatorBase<I1, Tuple2<K, I1>, 
GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-               final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, 
Tuple2<K, I2>>> keyMapper2 =
-                               new MapOperatorBase<I2, Tuple2<K, I2>, 
GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+               final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, 
Tuple2<K, I1>>> keyMapper1 =
+                               new MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+               final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, 
Tuple2<K, I2>>> keyMapper2 =
+                               new MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
                
                final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = 
new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, 
logicalKeyPositions1, keys2, name, outputType, typeInfoWithKey1, 
typeInfoWithKey2);
 
@@ -274,10 +275,10 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                final KeyExtractingMapper<I1, K> extractor1 = new 
KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
                final TupleKeyExtractingMapper<I2, K> extractor2 = new 
TupleKeyExtractingMapper<I2, K>(logicalKeyPositions2[0]);
 
-               final MapOperatorBase<I1, Tuple2<K, I1>, GenericMap<I1, 
Tuple2<K, I1>>> keyMapper1 =
-                               new MapOperatorBase<I1, Tuple2<K, I1>, 
GenericMap<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-               final MapOperatorBase<I2, Tuple2<K, I2>, GenericMap<I2, 
Tuple2<K, I2>>> keyMapper2 =
-                               new MapOperatorBase<I2, Tuple2<K, I2>, 
GenericMap<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
+               final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, 
Tuple2<K, I1>>> keyMapper1 =
+                               new MapOperatorBase<I1, Tuple2<K, I1>, 
MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, 
Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
+               final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, 
Tuple2<K, I2>>> keyMapper2 =
+                               new MapOperatorBase<I2, Tuple2<K, I2>, 
MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, 
Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
                
                final PlanUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = 
new PlanUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, 
logicalKeyPositions2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
 
@@ -407,7 +408,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         * @param field0 The first index of the Tuple fields of 
the second co-grouped DataSets that should be used as key
                         * @param fields The indexes of the Tuple fields of the 
second co-grouped DataSet that should be used as keys.
                         * @return An incomplete CoGroup transformation. 
-                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)}
 to finalize the CoGroup transformation. 
+                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
                        public CoGroupOperatorWithoutFunction equalTo(int 
field0, int... fields) {
                                int[] actualFields = new int[fields.length + 1];
@@ -423,7 +424,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         * @param field0 The first field of the second 
co-grouped DataSets that should be used as key
                         * @param fields The  fields of the first co-grouped 
DataSets that should be used as keys.
                         * @return An incomplete CoGroup transformation.
-                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)}
 to finalize the CoGroup transformation.
+                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
                        public CoGroupOperatorWithoutFunction equalTo(String 
field0, String... fields) {
                                String[] actualFields = new 
String[fields.length + 1];
@@ -439,7 +440,7 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                         * 
                         * @param keyExtractor The KeySelector function which 
extracts the key values from the second DataSet on which it is grouped.
                         * @return An incomplete CoGroup transformation. 
-                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)}
 to finalize the CoGroup transformation. 
+                        *           Call {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}
 to finalize the CoGroup transformation.
                         */
                        public <K> CoGroupOperatorWithoutFunction 
equalTo(KeySelector<I2, K> keyExtractor) {
                                return createCoGroupOperator(new 
Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType()));
@@ -447,8 +448,8 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
 
                        /**
                         * Intermediate step of a CoGroup transformation. <br/>
-                        * To continue the CoGroup transformation, provide a 
{@link CoGroupFunction} by calling 
-                        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(CoGroupFunction)}.
+                        * To continue the CoGroup transformation, provide a 
{@link org.apache.flink.api.java.functions.RichCoGroupFunction} by calling
+                        * {@link 
org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)}.
                         *
                         */
                        private CoGroupOperatorWithoutFunction 
createCoGroupOperator(Keys<I2> keys2) {
@@ -501,19 +502,22 @@ public class CoGroupOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OU
                                }
 
                                /**
-                                * Finalizes a CoGroup transformation by 
applying a {@link CoGroupFunction} to groups of elements with identical 
keys.<br/>
+                                * Finalizes a CoGroup transformation by 
applying a {@link org.apache.flink.api.java.functions.RichCoGroupFunction} to 
groups of elements with identical keys.<br/>
                                 * Each CoGroupFunction call returns an 
arbitrary number of keys. 
                                 * 
                                 * @param function The CoGroupFunction that is 
called for all groups of elements with identical keys.
                                 * @return An CoGroupOperator that represents 
the co-grouped result DataSet.
                                 * 
-                                * @see CoGroupFunction
+                                * @see 
org.apache.flink.api.java.functions.RichCoGroupFunction
                                 * @see DataSet
                                 */
                                public <R> CoGroupOperator<I1, I2, R> 
with(CoGroupFunction<I1, I2, R> function) {
                                        if (function == null) {
                                                throw new 
NullPointerException("CoGroup function must not be null.");
                                        }
+                                       if 
(FunctionUtils.isSerializedLambdaFunction(function)) {
+                                               throw new 
UnsupportedLambdaExpressionException();
+                                       }
                                        TypeInformation<R> returnType = 
TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), 
input2.getType());
                                        return new CoGroupOperator<I1, I2, 
R>(input1, input2, keys1, keys2, function, returnType);
                                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 036d292..d1e99d6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -21,14 +21,15 @@ package org.apache.flink.api.java.operators;
 import java.util.Arrays;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.base.CrossOperatorBase;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.CrossFunction;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
+import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.TypeInformation;
@@ -71,12 +72,12 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
        }
 
        @Override
-       protected 
org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, 
GenericCrosser<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, 
Operator<I2> input2) {
+       protected 
org.apache.flink.api.common.operators.base.CrossOperatorBase<I1, I2, OUT, 
CrossFunction<I1,I2,OUT>> translateToDataFlow(Operator<I1> input1, Operator<I2> 
input2) {
                
                String name = getName() != null ? getName() : 
function.getClass().getName();
                // create operator
-               CrossOperatorBase<I1, I2, OUT, GenericCrosser<I1, I2, OUT>> po =
-                               new CrossOperatorBase<I1, I2, OUT, 
GenericCrosser<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, 
OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
+               CrossOperatorBase<I1, I2, OUT, CrossFunction<I1, I2, OUT>> po =
+                               new CrossOperatorBase<I1, I2, OUT, 
CrossFunction<I1, I2, OUT>>(function, new BinaryOperatorInformation<I1, I2, 
OUT>(getInput1Type(), getInput2Type(), getResultType()), name);
                // set inputs
                po.setFirstInput(input1);
                po.setSecondInput(input2);
@@ -133,6 +134,9 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                        if (function == null) {
                                throw new NullPointerException("Cross function 
must not be null.");
                        }
+                       if (FunctionUtils.isSerializedLambdaFunction(function)) 
{
+                               throw new 
UnsupportedLambdaExpressionException();
+                       }
                        TypeInformation<R> returnType = 
TypeExtractor.getCrossReturnTypes(function, input1.getType(), input2.getType());
                        return new CrossOperator<I1, I2, R>(input1, input2, 
function, returnType);
                }
@@ -220,7 +224,7 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
                }
        }
 
-       public static final class ProjectCrossFunction<T1, T2, R extends Tuple> 
extends CrossFunction<T1, T2, R> {
+       public static final class ProjectCrossFunction<T1, T2, R extends Tuple> 
implements CrossFunction<T1, T2, R> {
 
                private static final long serialVersionUID = 1L;
 
@@ -1398,7 +1402,7 @@ public class CrossOperator<I1, I2, OUT> extends 
TwoInputUdfOperator<I1, I2, OUT,
        //  default join functions
        // 
--------------------------------------------------------------------------------------------
 
-       public static final class DefaultCrossFunction<T1, T2> extends 
CrossFunction<T1, T2, Tuple2<T1, T2>> {
+       public static final class DefaultCrossFunction<T1, T2> implements 
CrossFunction<T1, T2, Tuple2<T1, T2>> {
 
                private static final long serialVersionUID = 1L;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index cb7db06..591551f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -21,13 +21,14 @@ package org.apache.flink.api.java.operators;
 import java.util.Iterator;
 
 import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -78,15 +79,17 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
        @Override
        protected 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> 
translateToDataFlow(Operator<T> input) {
                
-               GroupReduceFunction<T, T> function = new DistinctFunction<T>();
+               final RichGroupReduceFunction<T, T> function = new 
DistinctFunction<T>();
+               final FlatCombineFunction<T> combineFunction = new 
DistinctCombiner<T>();
+
                String name = function.getClass().getName();
                
                if (keys instanceof Keys.FieldPositionKeys) {
 
                        int[] logicalKeyPositions = 
keys.computeLogicalKeyPositions();
                        UnaryOperatorInformation<T, T> operatorInfo = new 
UnaryOperatorInformation<T, T>(getInputType(), getResultType());
-                       GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>> 
po =
-                                       new GroupReduceOperatorBase<T, T, 
GenericGroupReduce<T, T>>(function, operatorInfo, logicalKeyPositions, name);
+                       GroupReduceOperatorBase<T, T, GroupReduceFunction<T, 
T>> po =
+                                       new GroupReduceOperatorBase<T, T, 
GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name);
 
                        po.setCombinable(true);
                        po.setInput(input);
@@ -98,9 +101,10 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                
                        @SuppressWarnings("unchecked")
                        Keys.SelectorFunctionKeys<T, ?> selectorKeys = 
(Keys.SelectorFunctionKeys<T, ?>) keys;
-                       
+
+
                        PlanUnwrappingReduceGroupOperator<T, T, ?> po = 
translateSelectorFunctionDistinct(
-                                                       selectorKeys, function, 
getInputType(), getResultType(), name, input, true);
+                                                       selectorKeys, function, 
combineFunction, getInputType(), getResultType(), name, input, true);
                        
                        po.setDegreeOfParallelism(this.getParallelism());
                        
@@ -114,7 +118,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
        // 
--------------------------------------------------------------------------------------------
        
        private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, 
K> translateSelectorFunctionDistinct(
-                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
GroupReduceFunction<IN, OUT> function,
+                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
RichGroupReduceFunction<IN, OUT> function, FlatCombineFunction<IN> 
combineFunction,
                        TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input,
                        boolean combinable)
        {
@@ -124,10 +128,12 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new 
TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
                
                KeyExtractingMapper<IN, K> extractor = new 
KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
+
+
+               PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
+                               new PlanUnwrappingReduceGroupOperator<IN, OUT, 
K>(function, keys, name, outputType, typeInfoWithKey, combinable);
                
-               PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new 
PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, 
typeInfoWithKey, combinable);
-               
-               MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, 
IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, 
IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, 
typeInfoWithKey), "Key Extractor");
+               MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, 
IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, 
IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, 
typeInfoWithKey), "Key Extractor");
 
                reducer.setInput(mapper);
                mapper.setInput(input);
@@ -138,7 +144,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                return reducer;
        }
        
-       public static final class DistinctFunction<T> extends 
GroupReduceFunction<T, T> {
+       public static final class DistinctFunction<T> extends 
RichGroupReduceFunction<T, T> {
 
                private static final long serialVersionUID = 1L;
 
@@ -148,4 +154,16 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                        out.collect(values.next());
                }
        }
+
+       public static final class DistinctCombiner<T> implements 
FlatCombineFunction<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void combine(Iterator<T> values, Collector<T> out)
+                               throws Exception {
+                       out.collect(values.next());
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
index ed4a786..1c03ccd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/FilterOperator.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.functions.FilterFunction;
 import org.apache.flink.api.java.operators.translation.PlanFilterOperator;
 
 import org.apache.flink.api.java.DataSet;
@@ -44,7 +44,7 @@ public class FilterOperator<T> extends 
SingleInputUdfOperator<T, T, FilterOperat
        }
        
        @Override
-       protected 
org.apache.flink.api.common.operators.base.FilterOperatorBase<T, 
GenericFlatMap<T,T>> translateToDataFlow(Operator<T> input) {
+       protected 
org.apache.flink.api.common.operators.base.FilterOperatorBase<T, 
FlatMapFunction<T,T>> translateToDataFlow(Operator<T> input) {
                
                String name = getName() != null ? getName() : 
function.getClass().getName();
                // create operator

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 14c0819..8e531d4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.java.operators;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.java.functions.FlatMapFunction;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.flink.api.java.DataSet;
@@ -47,11 +46,11 @@ public class FlatMapOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT, Fl
        }
 
        @Override
-       protected 
org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, 
GenericFlatMap<IN,OUT>> translateToDataFlow(Operator<IN> input) {
+       protected 
org.apache.flink.api.common.operators.base.FlatMapOperatorBase<IN, OUT, 
FlatMapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
                
                String name = getName() != null ? getName() : 
function.getClass().getName();
                // create operator
-               FlatMapOperatorBase<IN, OUT, GenericFlatMap<IN, OUT>> po = new 
FlatMapOperatorBase<IN, OUT, GenericFlatMap<IN, OUT>>(function, new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
+               FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>> po = new 
FlatMapOperatorBase<IN, OUT, FlatMapFunction<IN, OUT>>(function, new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
                // set input
                po.setInput(input);
                // set dop

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
new file mode 100644
index 0000000..7ab0b11
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.TypeInformation;
+
+import org.apache.flink.api.java.DataSet;
+
+
+/**
+ * This operator represents the application of a "reduceGroup" function on a 
data set, and the
+ * result data set produced by the function.
+ * 
+ * @param <IN> The type of the data set consumed by the operator.
+ * @param <OUT> The type of the data set created by the operator.
+ */
+public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, 
OUT, GroupReduceOperator<IN, OUT>> {
+
+       private final GroupReduceFunction<IN, OUT> function;
+
+       private final Grouping<IN> grouper;
+
+       // reduceFunction is a GroupReduceFunction
+       private boolean richFunction;
+
+       private boolean combinable;
+
+       /**
+        * Constructor for a non-grouped reduce (all reduce).
+        * 
+        * @param input The input data set to the groupReduce function.
+        * @param function The user-defined GroupReduce function.
+        */
+       public GroupReduceOperator(DataSet<IN> input, GroupReduceFunction<IN, 
OUT> function) {
+               super(input, TypeExtractor.getGroupReduceReturnTypes(function, 
input.getType()));
+               
+               this.function = function;
+               this.grouper = null;
+
+               checkCombinability();
+       }
+       
+       /**
+        * Constructor for a grouped reduce.
+        * 
+        * @param input The grouped input to be processed group-wise by the 
groupReduce function.
+        * @param function The user-defined GroupReduce function.
+        */
+       public GroupReduceOperator(Grouping<IN> input, GroupReduceFunction<IN, 
OUT> function) {
+               super(input != null ? input.getDataSet() : null, 
TypeExtractor.getGroupReduceReturnTypes(function, 
input.getDataSet().getType()));
+               
+               this.function = function;
+               this.grouper = input;
+
+               checkCombinability();
+
+               extractSemanticAnnotationsFromUdf(function.getClass());
+       }
+
+       private void checkCombinability() {
+               if (function instanceof FlatCombineFunction &&
+                               
function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != 
null) {
+                       this.combinable = true;
+               }
+       }
+
+       
+       // 
--------------------------------------------------------------------------------------------
+       //  Properties
+       // 
--------------------------------------------------------------------------------------------
+       
+       public boolean isCombinable() {
+               return combinable;
+       }
+       
+       public void setCombinable(boolean combinable) {
+               // sanity check that the function is a subclass of the combine 
interface
+               if (combinable && !(function instanceof FlatCombineFunction)) {
+                       throw new IllegalArgumentException("The function does 
not implement the combine interface.");
+               }
+               
+               this.combinable = combinable;
+       }
+       
+       @Override
+       protected 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, OUT, ?> 
translateToDataFlow(Operator<IN> input) {
+
+               String name = getName() != null ? getName() : 
function.getClass().getName();
+               
+               // distinguish between grouped reduce and non-grouped reduce
+               if (grouper == null) {
+                       // non grouped reduce
+                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+                       GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>> po =
+                                       new GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+
+                       po.setCombinable(combinable);
+                       // set input
+                       po.setInput(input);
+                       // the degree of parallelism for a non grouped reduce 
can only be 1
+                       po.setDegreeOfParallelism(1);
+                       return po;
+               }
+       
+               if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+               
+                       @SuppressWarnings("unchecked")
+                       Keys.SelectorFunctionKeys<IN, ?> selectorKeys = 
(Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+                       
+                       PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = 
translateSelectorFunctionReducer(
+                                                       selectorKeys, function, 
getInputType(), getResultType(), name, input, isCombinable());
+                       
+                       po.setDegreeOfParallelism(this.getParallelism());
+                       
+                       return po;
+               }
+               else if (grouper.getKeys() instanceof Keys.FieldPositionKeys) {
+
+                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
+                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+                       GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>> po =
+                                       new GroupReduceOperatorBase<IN, OUT, 
GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, 
name);
+
+                       po.setCombinable(combinable);
+                       po.setInput(input);
+                       po.setDegreeOfParallelism(this.getParallelism());
+                       
+                       // set group order
+                       if (grouper instanceof SortedGrouping) {
+                               SortedGrouping<IN> sortedGrouper = 
(SortedGrouping<IN>) grouper;
+                                                               
+                               int[] sortKeyPositions = 
sortedGrouper.getGroupSortKeyPositions();
+                               Order[] sortOrders = 
sortedGrouper.getGroupSortOrders();
+                               
+                               Ordering o = new Ordering();
+                               for(int i=0; i < sortKeyPositions.length; i++) {
+                                       o.appendOrdering(sortKeyPositions[i], 
null, sortOrders[i]);
+                               }
+                               po.setGroupOrder(o);
+                       }
+                       
+                       return po;
+               }
+               else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+
+                       int[] logicalKeyPositions = 
grouper.getKeys().computeLogicalKeyPositions();
+                       UnaryOperatorInformation<IN, OUT> operatorInfo = new 
UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+                       GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, 
OUT>> po =
+                                       new GroupReduceOperatorBase<IN, OUT, 
GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+
+                       po.setCombinable(combinable);
+                       po.setInput(input);
+                       po.setDegreeOfParallelism(this.getParallelism());
+                       
+                       return po;
+               }
+               else {
+                       throw new UnsupportedOperationException("Unrecognized 
key type.");
+               }
+               
+       }
+       
+       
+       // 
--------------------------------------------------------------------------------------------
+       
+       private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, 
K> translateSelectorFunctionReducer(
+                       Keys.SelectorFunctionKeys<IN, ?> rawKeys, 
GroupReduceFunction<IN, OUT> function,
+                       TypeInformation<IN> inputType, TypeInformation<OUT> 
outputType, String name, Operator<IN> input,
+                       boolean combinable)
+       {
+               @SuppressWarnings("unchecked")
+               final Keys.SelectorFunctionKeys<IN, K> keys = 
(Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+               
+               TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new 
TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+               
+               KeyExtractingMapper<IN, K> extractor = new 
KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
+               
+               PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new 
PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, 
typeInfoWithKey, combinable);
+               
+               MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, 
IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, 
IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, 
typeInfoWithKey), "Key Extractor");
+
+               reducer.setInput(mapper);
+               mapper.setInput(input);
+               
+               // set the mapper's parallelism to the input parallelism to 
make sure it is chained
+               mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+               
+               return reducer;
+       }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
index 37e74ef..3223f4d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.DataSet;
  * Grouping is an intermediate step for a transformation on a grouped 
DataSet.<br/>
  * The following transformation can be applied on Grouping:
  * <ul>
- *     <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.ReduceFunction)},</li>
- * <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.GroupReduceFunction)},
 and</li>
+ *     <li>{@link 
UnsortedGrouping#reduce(org.apache.flink.api.java.functions.RichReduceFunction)},</li>
+ * <li>{@link 
UnsortedGrouping#reduceGroup(org.apache.flink.api.java.functions.RichGroupReduceFunction)},
 and</li>
  * <li>{@link 
UnsortedGrouping#aggregate(org.apache.flink.api.java.aggregation.Aggregations, 
int)}.</li>
  * </ul>
  *

Reply via email to