http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
new file mode 100644
index 0000000..0c8bc97
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/CrossFunction.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+
+/**
+ * @param <IN1> First input type
+ * @param <IN2> Second input type
+ * @param <OUT> Output type
+ */
+public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+       /**
+        * User defined function for the cross operator.
+        * 
+        * @param record1 Record from first input
+        * @param record2 Record from the second input
+        * @return result of cross UDF.
+        * @throws Exception
+        */
+       OUT cross(IN1 record1, IN2 record2) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
new file mode 100644
index 0000000..2f68477
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FilterFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+import java.io.Serializable;
+
+public interface FilterFunction<T> extends Function, Serializable {
+       
+       /**
+        * User defined function for a filter.
+        * 
+        * @param value Incoming tuples
+        * @return true for tuples that are allowed to pass the filter
+        * @throws Exception
+        */
+       boolean filter(T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
new file mode 100644
index 0000000..b2c8f30
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatCombineFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Generic interface used for combiners.
+ */
+public interface FlatCombineFunction<T> extends Function, Serializable {
+
+       void combine(Iterator<T> values, Collector<T> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
new file mode 100644
index 0000000..6a6b971
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatJoinFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+
+public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, 
Serializable {
+
+       void join (IN1 left, IN2 right, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
new file mode 100644
index 0000000..a8696cf
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/FlatMapFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+
+/**
+ *
+ * @param <T>
+ * @param <O>
+ */
+public interface FlatMapFunction<T, O> extends Function, Serializable {
+
+       /**
+        * The core method of FlatMappable. Takes an element from the input 
data set and transforms
+        * it into zero, one, or more elements.
+        *
+        * @param value The input value.
+        * @param out The collector for for emitting result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       void flatMap(T value, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
index d3b7db4..c2a201f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -16,82 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.functions;
 
-import org.apache.flink.configuration.Configuration;
-
 /**
- * An base interface for all user-defined functions. This class defines 
methods for
- * the life cycle of the functions, as well as methods to access the context 
in which the functions
- * are executed.
+ * An base interface for all user-defined functions. This interface is empty 
in order
+ * to enable functions that are SAM (single abstract method) interfaces, so 
that they
+ * can be called as Java 8 lambdas
  */
 public interface Function {
-       
-       /**
-        * Initialization method for the function. It is called before the 
actual working methods 
-        * (like <i>map</i> or <i>join</i>) and thus suitable for one time 
setup work. For functions that
-        * are part of an iteration, this method will be invoked at the 
beginning of each iteration superstep.
-        * <p>
-        * The configuration object passed to the function can be used for 
configuration and initialization.
-        * The configuration contains all parameters that were configured on 
the function in the program
-        * composition.
-        * 
-        * <pre><blockquote>
-        * public class MyMapper extends FilterFunction<String> {
-        * 
-        *     private String searchString;
-        *     
-        *     public void open(Configuration parameters) {
-        *         this.searchString = parameters.getString("foo");
-        *     }
-        *     
-        *     public boolean filter(String value) {
-        *         return value.equals(searchString);
-        *     }
-        * }
-        * </blockquote></pre>
-        * <p>
-        * By default, this method does nothing.
-        * 
-        * @param parameters The configuration containing the parameters 
attached to the contract. 
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
-        *                   decide whether to retry the task execution.
-        * 
-        * @see org.apache.flink.configuration.Configuration
-        */
-       void open(Configuration parameters) throws Exception;
 
-       /**
-        * Teardown method for the user code. It is called after the last call 
to the main working methods
-        * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an 
iteration, this method will
-        * be invoked after each iteration superstep.
-        * <p>
-        * This method can be used for clean up work.
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
-        *                   decide whether to retry the task execution.
-        */
-       void close() throws Exception;
-       
-       
-       /**
-        * Gets the context that contains information about the UDF's runtime.
-        * 
-        * Context information are for example {@link 
org.apache.flink.api.common.accumulators.Accumulator}s
-        * or the {@link org.apache.flink.api.common.cache.DistributedCache}.
-        * 
-        * @return The UDF's runtime context.
-        */
-       RuntimeContext getRuntimeContext();
-       
-       /**
-        * Sets the function's runtime context. Called by the framework when 
creating a parallel instance of the function.
-        *  
-        * @param t The runtime context.
-        */
-       void setRuntimeContext(RuntimeContext t);
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
deleted file mode 100644
index 59669a2..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCoGrouper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.util.Collector;
-
-
-public interface GenericCoGrouper<V1, V2, O> extends Function {
-       
-       /**
-        * This method must be implemented to provide a user implementation of a
-        * coGroup. It is called for each two key-value pairs that share the 
same
-        * key and come from different inputs.
-        * 
-        * @param records1 The records from the first input which were paired 
with the key.
-        * @param records2 The records from the second input which were paired 
with the key.
-        * @param out A collector that collects all output pairs.
-        */
-       void coGroup(Iterator<V1> records1, Iterator<V2> records2, Collector<O> 
out) throws Exception;
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
index ada4eeb..41cfa1d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
@@ -23,7 +23,7 @@ import org.apache.flink.util.Collector;
 
 
 
-public interface GenericCollectorMap<T, O> extends Function {
+public interface GenericCollectorMap<T, O> extends RichFunction {
        
        void map(T record, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
deleted file mode 100644
index 8dfe758..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCombine.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Generic interface used for combiners.
- */
-public interface GenericCombine<T> extends Function {
-
-       void combine(Iterator<T> records, Collector<T> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
deleted file mode 100644
index 3de9b1d..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCrosser.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-
-/**
- * @param <V1> First input type
- * @param <V2> Second input type
- * @param <O> Output type
- */
-public interface GenericCrosser<V1, V2, O> extends Function {
-
-       /**
-        * User defined function for the cross operator.
-        * 
-        * @param record1 Record from first input
-        * @param record2 Record from the second input
-        * @param out Collector to submit resulting records.
-        * @throws Exception
-        */
-       void cross(V1 record1, V2 record2, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
deleted file mode 100644
index f34b038..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFilter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-public interface GenericFilter<T> extends Function {
-       
-       /**
-        * User defined function for a filter.
-        * 
-        * @param value Incoming tuples
-        * @return true for tuples that are allowed to pass the filter
-        * @throws Exception
-        */
-       boolean filter(T value) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
deleted file mode 100644
index efb1d49..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericFlatMap.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-
-/**
- *
- * @param <T>
- * @param <O>
- */
-public interface GenericFlatMap<T, O> extends Function {
-       
-       /**
-        * User defined function to perform transformations on records.
-        * This method allows to submit an arbitrary number of records
-        * per incoming tuple.
-        * 
-        * @param record incoming record
-        * @param out outgoing collector to return none, one or more records
-        * @throws Exception
-        */
-       void flatMap(T record, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
deleted file mode 100644
index e8d9910..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericGroupReduce.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.util.Collector;
-
-
-/**
- *
- * @param <T> Incoming types
- * @param <O> Outgoing types
- */
-public interface GenericGroupReduce<T, O> extends Function {
-       /**
-        * 
-        * The central function to be implemented for a reducer. The function 
receives per call one
-        * key and all the values that belong to that key. Each key is 
guaranteed to be processed by exactly
-        * one function call across all involved instances across all computing 
nodes.
-        * 
-        * @param records All records that belong to the given input key.
-        * @param out The collector to hand results to.
-        * @throws Exception
-        */
-       void reduce(Iterator<T> records, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
deleted file mode 100644
index 77c2ac9..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericJoiner.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-
-public interface GenericJoiner<V1, V2, O> extends Function {
-       
-       void join(V1 value1, V2 value2, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
deleted file mode 100644
index 316bf5d..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericMap.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-public interface GenericMap<T, O> extends Function {
-       
-       /**
-        * A user-implemented function that modifies or transforms an incoming 
object and
-        * returns the result.
-        */
-       O map(T record) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
deleted file mode 100644
index 9e75f2e..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericReduce.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.functions;
-
-
-public interface GenericReduce<T> extends Function {
-       
-       T reduce(T value1, T value2) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
new file mode 100644
index 0000000..984d1fd
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.flink.util.Collector;
+
+
+/**
+ *
+ * @param <T> Incoming types
+ * @param <O> Outgoing types
+ */
+public interface GroupReduceFunction<T, O> extends Function, Serializable {
+       /**
+        * 
+        * The central function to be implemented for a reducer. The function 
receives per call one
+        * key and all the values that belong to that key. Each key is 
guaranteed to be processed by exactly
+        * one function call across all involved instances across all computing 
nodes.
+        * 
+        * @param records All records that belong to the given input key.
+        * @param out The collector to hand results to.
+        * @throws Exception
+        */
+       void reduce(Iterator<T> values, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
new file mode 100644
index 0000000..02f526a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/JoinFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import java.io.Serializable;
+
+
+public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+       OUT join(IN1 first, IN2 second) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
new file mode 100644
index 0000000..4e2520d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/MapFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+
+import java.io.Serializable;
+
+public interface MapFunction<T, O> extends Function, Serializable {
+
+       /**
+        * The core method of Mappable. Takes an element from the input data 
set and transforms
+        * it into exactly one element.
+        *
+        * @param value The input value.
+        * @returns  The transformed value
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       O map(T value) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
new file mode 100644
index 0000000..04f690a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/ReduceFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+
+import java.io.Serializable;
+
+public interface ReduceFunction<T> extends Function, Serializable {
+
+       /**
+        * The core method of Reducible, combining two values into one value of 
the same type.
+        * The reduce function is consecutively applied to all values of a 
group until only a single value remains.
+        *
+        * @param value1 The first value to combine.
+        * @param value2 The second value to combine.
+        * @return The combined value of both input values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       T reduce(T value1, T value2) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
new file mode 100644
index 0000000..ffc3ac2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.common.functions;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An base interface for all rich user-defined functions. This class defines 
methods for
+ * the life cycle of the functions, as well as methods to access the context 
in which the functions
+ * are executed.
+ */
+public interface RichFunction extends Function {
+       
+       /**
+        * Initialization method for the function. It is called before the 
actual working methods 
+        * (like <i>map</i> or <i>join</i>) and thus suitable for one time 
setup work. For functions that
+        * are part of an iteration, this method will be invoked at the 
beginning of each iteration superstep.
+        * <p>
+        * The configuration object passed to the function can be used for 
configuration and initialization.
+        * The configuration contains all parameters that were configured on 
the function in the program
+        * composition.
+        * 
+        * <pre><blockquote>
+        * public class MyMapper extends FilterFunction<String> {
+        * 
+        *     private String searchString;
+        *     
+        *     public void open(Configuration parameters) {
+        *         this.searchString = parameters.getString("foo");
+        *     }
+        *     
+        *     public boolean filter(String value) {
+        *         return value.equals(searchString);
+        *     }
+        * }
+        * </blockquote></pre>
+        * <p>
+        * By default, this method does nothing.
+        * 
+        * @param parameters The configuration containing the parameters 
attached to the contract. 
+        * 
+        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
+        *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
+        *                   decide whether to retry the task execution.
+        * 
+        * @see org.apache.flink.configuration.Configuration
+        */
+       void open(Configuration parameters) throws Exception;
+
+       /**
+        * Teardown method for the user code. It is called after the last call 
to the main working methods
+        * (e.g. <i>map</i> or <i>join</i>). For functions that  are part of an 
iteration, this method will
+        * be invoked after each iteration superstep.
+        * <p>
+        * This method can be used for clean up work.
+        * 
+        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
+        *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
+        *                   decide whether to retry the task execution.
+        */
+       void close() throws Exception;
+       
+       
+       /**
+        * Gets the context that contains information about the UDF's runtime.
+        * 
+        * Context information are for example {@link 
org.apache.flink.api.common.accumulators.Accumulator}s
+        * or the {@link org.apache.flink.api.common.cache.DistributedCache}.
+        * 
+        * @return The UDF's runtime context.
+        */
+       RuntimeContext getRuntimeContext();
+       
+       /**
+        * Sets the function's runtime context. Called by the framework when 
creating a parallel instance of the function.
+        *  
+        * @param t The runtime context.
+        */
+       void setRuntimeContext(RuntimeContext t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index a5f0b19..3cf30ff 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.cache.DistributedCache;
  * the current degree of parallelism) and other constructs like accumulators 
and broadcast variables.
  * <p>
  * A function can, during runtime, obtain the RuntimeContext via a call to
- * {@link 
org.apache.flink.api.common.functions.AbstractFunction#getRuntimeContext()}.
+ * {@link AbstractRichFunction#getRuntimeContext()}.
  */
 public interface RuntimeContext {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
new file mode 100644
index 0000000..bc4ffd0
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.functions.util;
+
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+import java.lang.invoke.SerializedLambda;
+import java.lang.reflect.Method;
+
+public class FunctionUtils {
+
+
+       public static void openFunction (Function function, Configuration 
parameters) throws Exception{
+               if (function instanceof RichFunction) {
+                       RichFunction richFunction = (RichFunction) function;
+                       richFunction.open (parameters);
+               }
+       }
+
+       public static void closeFunction (Function function) throws Exception{
+               if (function instanceof RichFunction) {
+                       RichFunction richFunction = (RichFunction) function;
+                       richFunction.close ();
+               }
+       }
+
+       public static void setFunctionRuntimeContext (Function function, 
RuntimeContext context){
+               if (function instanceof RichFunction) {
+                       RichFunction richFunction = (RichFunction) function;
+                       richFunction.setRuntimeContext(context);
+               }
+       }
+
+       public static RuntimeContext getFunctionRuntimeContext (Function 
function, RuntimeContext defaultContext){
+               if (function instanceof RichFunction) {
+                       RichFunction richFunction = (RichFunction) function;
+                       return richFunction.getRuntimeContext();
+               }
+               else {
+                       return defaultContext;
+               }
+       }
+
+       public static boolean isSerializedLambdaFunction(Function function) {
+               Class<?> clazz = function.getClass();
+               try {
+                       Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
+                       replaceMethod.setAccessible(true);
+                       Object serializedForm = replaceMethod.invoke(function);
+                       if (serializedForm instanceof SerializedLambda) {
+                               return true;
+                       }
+                       else {
+                               return false;
+                       }
+               }
+               catch (Exception e) {
+                       return false;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
index b140dda..c416765 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Union.java
@@ -19,13 +19,13 @@
 
 package org.apache.flink.api.common.operators;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 
 /**
  * This operator represents a Union between two inputs.
  */
-public class Union<T> extends DualInputOperator<T, T, T, AbstractFunction> {
+public class Union<T> extends DualInputOperator<T, T, T, AbstractRichFunction> 
{
        
        private final static String NAME = "Union";
        
@@ -34,7 +34,7 @@ public class Union<T> extends DualInputOperator<T, T, T, 
AbstractFunction> {
         */
        public Union(BinaryOperatorInformation<T, T, T> operatorInfo) {
                // we pass it an AbstractFunction, because currently all 
operators expect some form of UDF
-               super(new 
UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, 
NAME);
+               super(new 
UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), 
operatorInfo, NAME);
        }
        
        public Union(Operator<T> input1, Operator<T> input2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index ffcab50..66bea7f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.api.common.operators.IterationOperator;
 import org.apache.flink.api.common.operators.Operator;
@@ -48,7 +48,7 @@ import org.apache.flink.util.Visitor;
 /**
  * 
  */
-public class BulkIterationBase<T> extends SingleInputOperator<T, T, 
AbstractFunction> implements IterationOperator {
+public class BulkIterationBase<T> extends SingleInputOperator<T, T, 
AbstractRichFunction> implements IterationOperator {
        
        private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
        
@@ -78,7 +78,7 @@ public class BulkIterationBase<T> extends 
SingleInputOperator<T, T, AbstractFunc
         * @param name
         */
        public BulkIterationBase(UnaryOperatorInformation<T, T> operatorInfo, 
String name) {
-               super(new 
UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, 
name);
+               super(new 
UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), 
operatorInfo, name);
                inputPlaceHolder = new PartialSolutionPlaceHolder<T>(this, 
this.getOperatorInfo());
        }
 
@@ -230,7 +230,7 @@ public class BulkIterationBase<T> extends 
SingleInputOperator<T, T, AbstractFunc
        /**
         * Special Mapper that is added before a termination criterion and is 
only a container for an special aggregator
         */
-       public static class TerminationCriterionMapper<X> extends 
AbstractFunction implements Serializable, GenericCollectorMap<X, Nothing> {
+       public static class TerminationCriterionMapper<X> extends 
AbstractRichFunction implements Serializable, GenericCollectorMap<X, Nothing> {
                private static final long serialVersionUID = 1L;
                
                private TerminationCriterionAggregator aggregator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index a9ae97c..4b85a31 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.Ordering;
@@ -28,9 +28,9 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 /**
- * @see GenericCoGrouper
+ * @see org.apache.flink.api.common.functions.CoGroupFunction
  */
-public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends 
GenericCoGrouper<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends 
CoGroupFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
        
        /**
         * The ordering for the order inside a group from input one.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index 33e150d..a66ea72 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -28,9 +28,9 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericCrosser
+ * @see org.apache.flink.api.common.functions.CrossFunction
  */
-public class CrossOperatorBase<IN1, IN2, OUT, FT extends GenericCrosser<?, ?, 
?>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<?, ?, 
?>> extends DualInputOperator<IN1, IN2, OUT, FT> {
        
        public CrossOperatorBase(UserCodeWrapper<FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, String name) {
                super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 89e5008..8e955b1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.IterationOperator;
@@ -51,7 +51,7 @@ import org.apache.flink.util.Visitor;
  * This class is a subclass of {@code DualInputOperator}. The solution set is 
considered the first input, the
  * workset is considered the second input.
  */
-public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, 
AbstractFunction> implements IterationOperator {
+public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, 
AbstractRichFunction> implements IterationOperator {
 
        private final Operator<ST> solutionSetPlaceholder;
 
@@ -88,7 +88,7 @@ public class DeltaIterationBase<ST, WT> extends 
DualInputOperator<ST, WT, ST, Ab
        }
 
        public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> 
operatorInfo, int[] keyPositions, String name) {
-               super(new 
UserCodeClassWrapper<AbstractFunction>(AbstractFunction.class), operatorInfo, 
name);
+               super(new 
UserCodeClassWrapper<AbstractRichFunction>(AbstractRichFunction.class), 
operatorInfo, name);
                this.solutionSetKeyFields = keyPositions;
                solutionSetPlaceholder = new SolutionSetPlaceHolder<ST>(this, 
new OperatorInformation<ST>(operatorInfo.getFirstInputType()));
                worksetPlaceholder = new WorksetPlaceHolder<WT>(this, new 
OperatorInformation<WT>(operatorInfo.getSecondInputType()));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index 34896a2..3c28c43 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -28,9 +28,9 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
  */
-public class FilterOperatorBase<T, FT extends GenericFlatMap<T, T>> extends 
SingleInputOperator<T, T, FT> {
+public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends 
SingleInputOperator<T, T, FT> {
        
        public FilterOperatorBase(UserCodeWrapper<FT> udf, 
UnaryOperatorInformation<T, T> operatorInfo, String name) {
                super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 0de236e..89575b6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -28,9 +28,9 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
  */
-public class FlatMapOperatorBase<IN, OUT, FT extends GenericFlatMap<IN, OUT>> 
extends SingleInputOperator<IN, OUT, FT> {
+public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> 
extends SingleInputOperator<IN, OUT, FT> {
        
        public FlatMapOperatorBase(UserCodeWrapper<FT> udf, 
UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
                super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index a24826a..ac55489 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -30,9 +31,9 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 
 /**
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class GroupReduceOperatorBase<IN, OUT, FT extends 
GenericGroupReduce<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
+public class GroupReduceOperatorBase<IN, OUT, FT extends 
GroupReduceFunction<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 
        /**
         * The ordering for the order inside a reduce group.
@@ -91,15 +92,15 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends 
GenericGroupReduce<IN,
        /**
         * Marks the group reduce operation as combinable. Combinable 
operations may pre-reduce the
         * data before the actual group reduce operations. Combinable 
user-defined functions
-        * must implement the interface {@link GenericCombine}.
+        * must implement the interface {@link 
org.apache.flink.api.common.functions.FlatCombineFunction}.
         * 
         * @param combinable Flag to mark the group reduce operation as 
combinable.
         */
        public void setCombinable(boolean combinable) {
                // sanity check
-               if (combinable && 
!GenericCombine.class.isAssignableFrom(this.userFunction.getUserCodeClass())) {
-                       throw new IllegalArgumentException("Cannot set a UDF as 
combinable if it does not implement the interface " + 
-                                       GenericCombine.class.getName());
+               if (combinable && 
!FlatCombineFunction.class.isAssignableFrom(this.userFunction.getUserCodeClass()))
 {
+                       throw new IllegalArgumentException("Cannot set a UDF as 
combinable if it does not implement the interface " +
+                                       FlatCombineFunction.class.getName());
                } else {
                        this.combinable = combinable;
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index b4eeeaa..2ce0529 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -27,9 +27,9 @@ import 
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 
 /**
- * @see GenericJoiner
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class JoinOperatorBase<IN1, IN2, OUT, FT extends GenericJoiner<IN1, 
IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT>
+public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, 
IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT>
 {
        public JoinOperatorBase(UserCodeWrapper<FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, 
int[] keyPositions2, String name) {
                super(udf, operatorInfo, keyPositions1, keyPositions2, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index efd8fa9..26fde05 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -33,7 +33,7 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * @param <OUT> The result type.
  * @param <FT> The type of the user-defined function.
  */
-public class MapOperatorBase<IN, OUT, FT extends GenericMap<IN, OUT>> extends 
SingleInputOperator<IN, OUT, FT> {
+public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends 
SingleInputOperator<IN, OUT, FT> {
        
        public MapOperatorBase(UserCodeWrapper<FT> udf, 
UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
                super(udf, operatorInfo, name);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 62996ea..e6c435f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -31,12 +31,12 @@ import 
org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * Base data flow operator for Reduce user-defined functions. Accepts reduce 
functions
  * and key positions. The key positions are expected in the flattened common 
data model.
  * 
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  *
  * @param <T> The type (parameters and return type) of the reduce function.
  * @param <FT> The type of the reduce function.
  */
-public class ReduceOperatorBase<T, FT extends GenericReduce<T>> extends 
SingleInputOperator<T, T, FT> {
+public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends 
SingleInputOperator<T, T, FT> {
 
        /**
         * Creates a grouped reduce data flow operator.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
index 2ebae24..7d6495b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/OperatorUtil.java
@@ -25,11 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.io.InputFormat;
@@ -56,10 +56,10 @@ public class OperatorUtil {
 
        static {
                STUB_CONTRACTS.put(GenericCollectorMap.class, 
CollectorMapOperatorBase.class);
-               STUB_CONTRACTS.put(GenericGroupReduce.class, 
GroupReduceOperatorBase.class);
-               STUB_CONTRACTS.put(GenericCoGrouper.class, 
CoGroupOperatorBase.class);
-               STUB_CONTRACTS.put(GenericCrosser.class, 
CrossOperatorBase.class);
-               STUB_CONTRACTS.put(GenericJoiner.class, JoinOperatorBase.class);
+               STUB_CONTRACTS.put(GroupReduceFunction.class, 
GroupReduceOperatorBase.class);
+               STUB_CONTRACTS.put(CoGroupFunction.class, 
CoGroupOperatorBase.class);
+               STUB_CONTRACTS.put(CrossFunction.class, 
CrossOperatorBase.class);
+               STUB_CONTRACTS.put(FlatJoinFunction.class, 
JoinOperatorBase.class);
                STUB_CONTRACTS.put(FileInputFormat.class, 
GenericDataSourceBase.class);
                STUB_CONTRACTS.put(FileOutputFormat.class, 
GenericDataSinkBase.class);
                STUB_CONTRACTS.put(InputFormat.class, 
GenericDataSourceBase.class);
@@ -67,7 +67,7 @@ public class OperatorUtil {
        }
 
        /**
-        * Returns the associated {@link Operator} type for the given {@link 
org.apache.flink.api.common.functions.Function} class.
+        * Returns the associated {@link Operator} type for the given {@link 
org.apache.flink.api.common.functions.RichFunction} class.
         * 
         * @param stubClass
         *        the stub class

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
index 647ceab..30091ab 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/OperatorUtilTest.java
@@ -21,12 +21,12 @@ package org.apache.flink.api.common.operators.util;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.common.functions.GenericCrosser;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
@@ -36,7 +36,6 @@ import 
org.apache.flink.api.common.operators.base.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.base.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.util.OperatorUtil;
 import org.apache.flink.types.IntValue;
 import org.junit.Test;
 
@@ -85,7 +84,7 @@ public class OperatorUtilTest {
         */
        @Test
        public void getContractClassShouldReturnNullForStub() {
-               final Class<?> result = 
OperatorUtil.getContractClass(Function.class);
+               final Class<?> result = 
OperatorUtil.getContractClass(RichFunction.class);
                assertEquals(null, result);
        }
 
@@ -116,13 +115,13 @@ public class OperatorUtilTest {
                assertEquals(GenericDataSourceBase.class, result);
        }
 
-       static abstract class CoGrouper implements GenericCoGrouper<IntValue, 
IntValue, IntValue> {}
+       static abstract class CoGrouper implements CoGroupFunction<IntValue, 
IntValue, IntValue> {}
 
-       static abstract class Crosser implements GenericCrosser<IntValue, 
IntValue, IntValue> {}
+       static abstract class Crosser implements CrossFunction<IntValue, 
IntValue, IntValue> {}
 
        static abstract class Mapper implements GenericCollectorMap<IntValue, 
IntValue> {}
 
-       static abstract class Matcher implements GenericJoiner<IntValue, 
IntValue, IntValue> {}
+       static abstract class Matcher implements FlatJoinFunction<IntValue, 
IntValue, IntValue> {}
 
-       static abstract class Reducer implements GenericGroupReduce<IntValue, 
IntValue> {}
+       static abstract class Reducer implements GroupReduceFunction<IntValue, 
IntValue> {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml 
b/flink-examples/flink-java-examples/pom.xml
index 549e95b..ea0db5d 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -317,7 +317,6 @@ under the License.
                                        
                                </executions>
                        </plugin>
-
                </plugins>
        </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
index 4c22db1..8767aca 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/clustering/KMeans.java
@@ -21,8 +21,9 @@ package org.apache.flink.example.java.clustering;
 import java.io.Serializable;
 import java.util.Collection;
 
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
@@ -197,7 +198,7 @@ public class KMeans {
        // 
*************************************************************************
        
        /** Converts a Tuple2<Double,Double> into a Point. */
-       public static final class TuplePointConverter extends 
MapFunction<Tuple2<Double, Double>, Point> {
+       public static final class TuplePointConverter implements 
MapFunction<Tuple2<Double, Double>, Point> {
 
                @Override
                public Point map(Tuple2<Double, Double> t) throws Exception {
@@ -206,7 +207,7 @@ public class KMeans {
        }
        
        /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
-       public static final class TupleCentroidConverter extends 
MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+       public static final class TupleCentroidConverter implements 
MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
 
                @Override
                public Centroid map(Tuple3<Integer, Double, Double> t) throws 
Exception {
@@ -215,7 +216,7 @@ public class KMeans {
        }
        
        /** Determines the closest cluster center for a data point. */
-       public static final class SelectNearestCenter extends 
MapFunction<Point, Tuple2<Integer, Point>> {
+       public static final class SelectNearestCenter extends 
RichMapFunction<Point, Tuple2<Integer, Point>> {
                private Collection<Centroid> centroids;
 
                /** Reads the centroid values from a broadcast variable into a 
collection. */
@@ -248,7 +249,7 @@ public class KMeans {
        }
        
        /** Appends a count variable to the tuple. */ 
-       public static final class CountAppender extends 
MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+       public static final class CountAppender implements 
MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
 
                @Override
                public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> 
t) {
@@ -257,7 +258,7 @@ public class KMeans {
        }
        
        /** Sums and counts point coordinates. */
-       public static final class CentroidAccumulator extends 
ReduceFunction<Tuple3<Integer, Point, Long>> {
+       public static final class CentroidAccumulator implements 
ReduceFunction<Tuple3<Integer, Point, Long>> {
 
                @Override
                public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, 
Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
@@ -266,7 +267,7 @@ public class KMeans {
        }
        
        /** Computes new centroid from coordinate sum and count of points. */
-       public static final class CentroidAverager extends 
MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+       public static final class CentroidAverager implements 
MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
 
                @Override
                public Centroid map(Tuple3<Integer, Point, Long> value) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
index b71347a..0d38e06 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java
@@ -20,10 +20,11 @@
 package org.apache.flink.example.java.graph;
 
 import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
@@ -127,7 +128,7 @@ public class ConnectedComponents implements 
ProgramDescription {
         * Function that turns a value into a 2-tuple where both fields are 
that value.
         */
        @ConstantFields("0 -> 0,1") 
-       public static final class DuplicateValue<T> extends MapFunction<T, 
Tuple2<T, T>> {
+       public static final class DuplicateValue<T> implements MapFunction<T, 
Tuple2<T, T>> {
                
                @Override
                public Tuple2<T, T> map(T vertex) {
@@ -138,7 +139,7 @@ public class ConnectedComponents implements 
ProgramDescription {
        /**
         * Undirected edges by emitting for each input edge the input edges 
itself and an inverted version.
         */
-       public static final class UndirectEdge extends 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class UndirectEdge implements 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
                Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
                
                @Override
@@ -157,7 +158,7 @@ public class ConnectedComponents implements 
ProgramDescription {
         */
        @ConstantFieldsFirst("1 -> 0")
        @ConstantFieldsSecond("1 -> 1")
-       public static final class NeighborWithComponentIDJoin extends 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class NeighborWithComponentIDJoin implements 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                @Override
                public Tuple2<Long, Long> join(Tuple2<Long, Long> 
vertexWithComponent, Tuple2<Long, Long> edge) {
@@ -165,11 +166,10 @@ public class ConnectedComponents implements 
ProgramDescription {
                }
        }
        
-       /**
-        * The input is nested tuples ( (vertex-id, candidate-component) , 
(vertex-id, current-component) )
-        */
+
+
        @ConstantFieldsFirst("0")
-       public static final class ComponentIdFilter extends 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+       public static final class ComponentIdFilter implements 
FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
                @Override
                public void join(Tuple2<Long, Long> candidate, Tuple2<Long, 
Long> old, Collector<Tuple2<Long, Long>> out) {
@@ -177,10 +177,10 @@ public class ConnectedComponents implements 
ProgramDescription {
                                out.collect(candidate);
                        }
                }
-               @Override
-               public Tuple2<Long, Long> join(Tuple2<Long, Long> first, 
Tuple2<Long, Long> second) { return null; }
        }
 
+
+
        @Override
        public String getDescription() {
                return "Parameters: <vertices-path> <edges-path> <result-path> 
<max-number-of-iterations>";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
index fba18fc..2d794bd 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/EnumTrianglesBasic.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.JoinFunction;
-import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
@@ -119,7 +119,7 @@ public class EnumTrianglesBasic {
        // 
*************************************************************************
 
        /** Converts a Tuple2 into an Edge */
-       public static class TupleEdgeConverter extends 
MapFunction<Tuple2<Integer, Integer>, Edge> {
+       public static class TupleEdgeConverter implements 
MapFunction<Tuple2<Integer, Integer>, Edge> {
                private final Edge outEdge = new Edge();
                
                @Override
@@ -130,7 +130,7 @@ public class EnumTrianglesBasic {
        }
        
        /** Projects an edge (pair of vertices) such that the id of the first 
is smaller than the id of the second. */
-       private static class EdgeByIdProjector extends MapFunction<Edge, Edge> {
+       private static class EdgeByIdProjector implements MapFunction<Edge, 
Edge> {
        
                @Override
                public Edge map(Edge inEdge) throws Exception {
@@ -149,7 +149,7 @@ public class EnumTrianglesBasic {
         *  The first vertex of a triad is the shared vertex, the second and 
third vertex are ordered by vertexId. 
         *  Assumes that input edges share the first vertex and are in 
ascending order of the second vertex.
         */
-       private static class TriadBuilder extends GroupReduceFunction<Edge, 
Triad> {
+       private static class TriadBuilder implements GroupReduceFunction<Edge, 
Triad> {
                private final List<Integer> vertices = new ArrayList<Integer>();
                private final Triad outTriad = new Triad();
                
@@ -180,7 +180,7 @@ public class EnumTrianglesBasic {
        }
        
        /** Filters triads (three vertices connected by two edges) without a 
closing third edge. */
-       private static class TriadFilter extends JoinFunction<Triad, Edge, 
Triad> {
+       private static class TriadFilter implements JoinFunction<Triad, Edge, 
Triad> {
                
                @Override
                public Triad join(Triad triad, Edge edge) throws Exception {

Reply via email to