http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index da0f222..0727d63 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper;
@@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The GroupReduceTask creates a iterator over all records from its input. The 
iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the 
GroupReduceFunction.
  * 
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupReduce<IT, OT>, OT> {
+public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction<IT, OT>, OT> {
        
        private static final Log LOG = 
LogFactory.getLog(AllGroupReduceDriver.class);
 
-       private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext;
+       private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
        
        private MutableObjectIterator<IT> input;
 
@@ -54,7 +54,7 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupRedu
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> 
context) {
+       public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> 
context) {
                this.taskContext = context;
        }
        
@@ -64,9 +64,9 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupRedu
        }
 
        @Override
-       public Class<GenericGroupReduce<IT, OT>> getStubType() {
+       public Class<GroupReduceFunction<IT, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericGroupReduce<IT, OT>> clazz = 
(Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class;
+               final Class<GroupReduceFunction<IT, OT>> clazz = 
(Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class;
                return clazz;
        }
 
@@ -83,8 +83,8 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupRedu
                this.strategy = config.getDriverStrategy();
                
                if (strategy == DriverStrategy.ALL_GROUP_COMBINE) {
-                       if (!(this.taskContext.getStub() instanceof 
GenericCombine)) {
-                               throw new Exception("Using combiner on a UDF 
that does not implement the combiner interface " + 
GenericCombine.class.getName());
+                       if (!(this.taskContext.getStub() instanceof 
FlatCombineFunction)) {
+                               throw new Exception("Using combiner on a UDF 
that does not implement the combiner interface " + 
FlatCombineFunction.class.getName());
                        }
                }
                else if (strategy != DriverStrategy.ALL_GROUP_REDUCE) {
@@ -105,13 +105,13 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupRedu
                // single UDF call with the single group
                if (inIter.hasNext()) {
                        if (strategy == DriverStrategy.ALL_GROUP_REDUCE) {
-                               final GenericGroupReduce<IT, OT> reducer = 
this.taskContext.getStub();
+                               final GroupReduceFunction<IT, OT> reducer = 
this.taskContext.getStub();
                                final Collector<OT> output = 
this.taskContext.getOutputCollector();
                                reducer.reduce(inIter, output);
                        }
                        else {
                                @SuppressWarnings("unchecked")
-                               final GenericCombine<IT> combiner = 
(GenericCombine<IT>) this.taskContext.getStub();
+                               final FlatCombineFunction<IT> combiner = 
(FlatCombineFunction<IT>) this.taskContext.getStub();
                                @SuppressWarnings("unchecked")
                                final Collector<IT> output = (Collector<IT>) 
this.taskContext.getOutputCollector();
                                combiner.combine(inIter, output);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 30bfae3..721f4f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -35,13 +35,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The ReduceTask creates a iterator over all records from its input. The 
iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the 
ReduceFunction.
  * 
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
        
        private static final Log LOG = LogFactory.getLog(AllReduceDriver.class);
 
-       private PactTaskContext<GenericReduce<T>, T> taskContext;
+       private PactTaskContext<ReduceFunction<T>, T> taskContext;
        
        private MutableObjectIterator<T> input;
 
@@ -52,7 +52,7 @@ public class AllReduceDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+       public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -63,9 +63,9 @@ public class AllReduceDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        }
 
        @Override
-       public Class<GenericReduce<T>> getStubType() {
+       public Class<ReduceFunction<T>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) 
(Class<?>) GenericReduce.class;
+               final Class<ReduceFunction<T>> clazz = 
(Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
                return clazz;
        }
 
@@ -94,7 +94,7 @@ public class AllReduceDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
                        LOG.debug(this.taskContext.formatLogString("AllReduce 
preprocessing done. Running Reducer code."));
                }
 
-               final GenericReduce<T> stub = this.taskContext.getStub();
+               final ReduceFunction<T> stub = this.taskContext.getStub();
                final MutableObjectIterator<T> input = this.input;
                final TypeSerializer<T> serializer = this.serializer;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 3da451a..8ff0262 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,12 +41,12 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.java.record.functions.CoGroupFunction
  */
-public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
        
        private static final Log LOG = LogFactory.getLog(CoGroupDriver.class);
        
        
-       private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+       private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CoGroupTaskIterator<IT1, IT2> coGroupIterator;                  
        // the iterator that does the actual cogroup
        
@@ -56,7 +56,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<GenericCoGrouper<
 
 
        @Override
-       public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -69,9 +69,9 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<GenericCoGrouper<
        
 
        @Override
-       public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+       public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = 
(Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+               final Class<CoGroupFunction<IT1, IT2, OT>> clazz = 
(Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
                return clazz;
        }
        
@@ -122,7 +122,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements 
PactDriver<GenericCoGrouper<
        @Override
        public void run() throws Exception
        {
-               final GenericCoGrouper<IT1, IT2, OT> coGroupStub = 
this.taskContext.getStub();
+               final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                final CoGroupTaskIterator<IT1, IT2> coGroupIterator = 
this.coGroupIterator;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 9d06618..8761a2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
 
-public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements 
ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements 
ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
        
-       private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+       private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CompactingHashTable<IT1> hashTable;
        
@@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements Resettab
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements Resettab
        }
        
        @Override
-       public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+       public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = 
(Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+               final Class<CoGroupFunction<IT1, IT2, OT>> clazz = 
(Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
                return clazz;
        }
        
@@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, 
OT> implements Resettab
        @Override
        public void run() throws Exception {
 
-               final GenericCoGrouper<IT1, IT2, OT> coGroupStub = 
taskContext.getStub();
+               final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
taskContext.getStub();
                final Collector<OT> collector = 
taskContext.getOutputCollector();
                
                IT1 buildSideRecord = solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 80fa855..f2020c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import java.util.Iterator;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.runtime.util.SingleElementIterator;
 import org.apache.flink.util.Collector;
 
-public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements 
ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> {
+public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements 
ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> {
        
-       private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext;
+       private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CompactingHashTable<IT2> hashTable;
        
@@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resetta
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resetta
        }
        
        @Override
-       public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() {
+       public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = 
(Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class;
+               final Class<CoGroupFunction<IT1, IT2, OT>> clazz = 
(Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class;
                return clazz;
        }
        
@@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, 
OT> implements Resetta
        @Override
        public void run() throws Exception {
 
-               final GenericCoGrouper<IT1, IT2, OT> coGroupStub = 
taskContext.getStub();
+               final CoGroupFunction<IT1, IT2, OT> coGroupStub = 
taskContext.getStub();
                final Collector<OT> collector = 
taskContext.getOutputCollector();
                
                IT2 buildSideRecord = solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 7c311ed..b68f24d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCrosser;
+import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import 
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator;
@@ -40,12 +40,12 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.java.functions.CrossFunction
  */
-public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, 
T2, OT>, OT> {
+public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, 
T2, OT>, OT> {
        
        private static final Log LOG = LogFactory.getLog(CrossDriver.class);
        
        
-       private PactTaskContext<GenericCrosser<T1, T2, OT>, OT> taskContext;
+       private PactTaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
        
        private MemoryManager memManager;
        
@@ -67,7 +67,7 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
 
 
        @Override
-       public void setup(PactTaskContext<GenericCrosser<T1, T2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -80,9 +80,9 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
 
 
        @Override
-       public Class<GenericCrosser<T1, T2, OT>> getStubType() {
+       public Class<CrossFunction<T1, T2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericCrosser<T1, T2, OT>> clazz = 
(Class<GenericCrosser<T1, T2, OT>>) (Class<?>) GenericCrosser.class;
+               final Class<CrossFunction<T1, T2, OT>> clazz = 
(Class<CrossFunction<T1, T2, OT>>) (Class<?>) CrossFunction.class;
                return clazz;
        }
        
@@ -207,7 +207,7 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                final T2 val2Reuse = serializer2.createInstance();
                T2 val2Copy = serializer2.createInstance();
                
-               final GenericCrosser<T1, T2, OT> crosser = 
this.taskContext.getStub();
+               final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                
                // for all blocks
@@ -217,7 +217,8 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                                // for all values in the block
                                while ((val1 = blockVals.next(val1Reuse)) != 
null) {
                                        val2Copy = serializer2.copy(val2, 
val2Copy);
-                                       crosser.cross(val1, val2Copy, 
collector);
+                                       
collector.collect(crosser.cross(val1,val2Copy));
+                                       //crosser.cross(val1, val2Copy, 
collector);
                                }
                                blockVals.reset();
                        }
@@ -254,7 +255,7 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                T2 val2;
                final T2 val2Reuse = serializer2.createInstance();
 
-               final GenericCrosser<T1, T2, OT> crosser = 
this.taskContext.getStub();
+               final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                
                // for all blocks
@@ -264,7 +265,8 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                                // for all values in the block
                                while (this.running && ((val2 = 
blockVals.next(val2Reuse)) != null)) {
                                        val1Copy = serializer1.copy(val1, 
val1Copy);
-                                       crosser.cross(val1Copy, val2, 
collector);
+                                       
collector.collect(crosser.cross(val1Copy, val2));
+                                       //crosser.cross(val1Copy, val2, 
collector);
                                }
                                blockVals.reset();
                        }
@@ -296,7 +298,7 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                T2 val2;
                final T2 val2Reuse = serializer2.createInstance();
 
-               final GenericCrosser<T1, T2, OT> crosser = 
this.taskContext.getStub();
+               final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                
                // for all blocks
@@ -304,7 +306,8 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                        // for all values from the spilling side
                        while (this.running && ((val2 = 
spillVals.next(val2Reuse)) != null)) {
                                val1Copy = serializer1.copy(val1, val1Copy);
-                               crosser.cross(val1Copy, val2, collector);
+                               collector.collect(crosser.cross(val1Copy, 
val2));
+                               //crosser.cross(val1Copy, val2, collector);
                        }
                        spillVals.reset();
                }
@@ -332,7 +335,7 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                final T2 val2Reuse = serializer2.createInstance();
                T2 val2Copy = serializer2.createInstance();
                
-               final GenericCrosser<T1, T2, OT> crosser = 
this.taskContext.getStub();
+               final CrossFunction<T1, T2, OT> crosser = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                
                // for all blocks
@@ -340,7 +343,8 @@ public class CrossDriver<T1, T2, OT> implements 
PactDriver<GenericCrosser<T1, T2
                        // for all values from the spilling side
                        while (this.running && (val1 = 
spillVals.next(val1Reuse)) != null) {
                                val2Copy = serializer2.copy(val2, val2Copy);
-                               crosser.cross(val1, val2Copy, collector);
+                               collector.collect(crosser.cross(val1, 
val2Copy));
+                               //crosser.cross(val1, val2Copy, collector);
                        }
                        spillVals.reset();
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index 44f22a0..130601b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator;
  * The MapTask creates an iterator over all key-value pairs of its input and 
hands that to the <code>map()</code> method
  * of the MapFunction.
  * 
- * @see GenericFlatMap
+ * @see org.apache.flink.api.common.functions.FlatMapFunction
  * 
  * @param <IT> The mapper's input data type.
  * @param <OT> The mapper's output data type.
  */
-public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, 
OT>, OT> {
+public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, 
OT>, OT> {
        
-       private PactTaskContext<GenericFlatMap<IT, OT>, OT> taskContext;
+       private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext;
        
        private volatile boolean running;
        
        
        @Override
-       public void setup(PactTaskContext<GenericFlatMap<IT, OT>, OT> context) {
+       public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) 
{
                this.taskContext = context;
                this.running = true;
        }
@@ -55,9 +55,9 @@ public class FlatMapDriver<IT, OT> implements 
PactDriver<GenericFlatMap<IT, OT>,
        }
 
        @Override
-       public Class<GenericFlatMap<IT, OT>> getStubType() {
+       public Class<FlatMapFunction<IT, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericFlatMap<IT, OT>> clazz = 
(Class<GenericFlatMap<IT, OT>>) (Class<?>) GenericFlatMap.class;
+               final Class<FlatMapFunction<IT, OT>> clazz = 
(Class<FlatMapFunction<IT, OT>>) (Class<?>) FlatMapFunction.class;
                return clazz;
        }
 
@@ -75,7 +75,7 @@ public class FlatMapDriver<IT, OT> implements 
PactDriver<GenericFlatMap<IT, OT>,
        public void run() throws Exception {
                // cache references on the stack
                final MutableObjectIterator<IT> input = 
this.taskContext.getInput(0);
-               final GenericFlatMap<IT, OT> function = 
this.taskContext.getStub();
+               final FlatMapFunction<IT, OT> function = 
this.taskContext.getStub();
                final Collector<OT> output = 
this.taskContext.getOutputCollector();
 
                IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 1d0749c..f786c56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -39,12 +39,12 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type consumed and produced by the combiner.
  */
-public class GroupReduceCombineDriver<T> implements 
PactDriver<GenericCombine<T>, T> {
+public class GroupReduceCombineDriver<T> implements 
PactDriver<FlatCombineFunction<T>, T> {
        
        private static final Log LOG = 
LogFactory.getLog(GroupReduceCombineDriver.class);
 
        
-       private PactTaskContext<GenericCombine<T>, T> taskContext;
+       private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
        
        private CloseableInputProvider<T> input;
 
@@ -57,7 +57,7 @@ public class GroupReduceCombineDriver<T> implements 
PactDriver<GenericCombine<T>
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericCombine<T>, T> context) {
+       public void setup(PactTaskContext<FlatCombineFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -68,9 +68,9 @@ public class GroupReduceCombineDriver<T> implements 
PactDriver<GenericCombine<T>
        }
 
        @Override
-       public Class<GenericCombine<T>> getStubType() {
+       public Class<FlatCombineFunction<T>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericCombine<T>> clazz = 
(Class<GenericCombine<T>>) (Class<?>) GenericCombine.class;
+               final Class<FlatCombineFunction<T>> clazz = 
(Class<FlatCombineFunction<T>>) (Class<?>) FlatCombineFunction.class;
                return clazz;
        }
 
@@ -111,7 +111,7 @@ public class GroupReduceCombineDriver<T> implements 
PactDriver<GenericCombine<T>
                                this.serializerFactory.getSerializer(), 
this.comparator);
 
                // cache references on the stack
-               final GenericCombine<T> stub = this.taskContext.getStub();
+               final FlatCombineFunction<T> stub = this.taskContext.getStub();
                final Collector<T> output = 
this.taskContext.getOutputCollector();
 
                // run stub implementation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 1ab080c..960143d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericGroupReduce;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The GroupReduceTask creates a iterator over all records from its input. The 
iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the 
GroupReduceFunction.
  * 
- * @see GenericGroupReduce
+ * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class GroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupReduce<IT, OT>, OT> {
+public class GroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction<IT, OT>, OT> {
        
        private static final Log LOG = 
LogFactory.getLog(GroupReduceDriver.class);
 
-       private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext;
+       private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
        
        private MutableObjectIterator<IT> input;
 
@@ -56,7 +56,7 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupReduce<
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> 
context) {
+       public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -67,9 +67,9 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupReduce<
        }
 
        @Override
-       public Class<GenericGroupReduce<IT, OT>> getStubType() {
+       public Class<GroupReduceFunction<IT, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericGroupReduce<IT, OT>> clazz = 
(Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class;
+               final Class<GroupReduceFunction<IT, OT>> clazz = 
(Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class;
                return clazz;
        }
 
@@ -100,7 +100,7 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GenericGroupReduce<
                final KeyGroupedIterator<IT> iter = new 
KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator);
 
                // cache references on the stack
-               final GenericGroupReduce<IT, OT> stub = 
this.taskContext.getStub();
+               final GroupReduceFunction<IT, OT> stub = 
this.taskContext.getStub();
                final Collector<OT> output = 
this.taskContext.getOutputCollector();
 
                // run stub implementation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index b23b0cd..342f307 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements 
ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements 
ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
        
-       private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+       private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CompactingHashTable<IT1> hashTable;
        
@@ -50,7 +50,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements ResettableP
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -61,9 +61,9 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements ResettableP
        }
        
        @Override
-       public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+       public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericJoiner<IT1, IT2, OT>> clazz = 
(Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+               final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = 
(Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
                return clazz;
        }
        
@@ -126,7 +126,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements ResettableP
        @Override
        public void run() throws Exception {
 
-               final GenericJoiner<IT1, IT2, OT> joinFunction = 
taskContext.getStub();
+               final FlatJoinFunction<IT1, IT2, OT> joinFunction = 
taskContext.getStub();
                final Collector<OT> collector = 
taskContext.getOutputCollector();
                
                IT1 buildSideRecord = this.solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 4fa5c5a..c38a81a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements 
ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements 
ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
        
-       private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+       private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CompactingHashTable<IT2> hashTable;
        
@@ -50,7 +50,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resettable
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -61,9 +61,9 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resettable
        }
        
        @Override
-       public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+       public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericJoiner<IT1, IT2, OT>> clazz = 
(Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+               final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = 
(Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
                return clazz;
        }
        
@@ -126,7 +126,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resettable
        @Override
        public void run() throws Exception {
 
-               final GenericJoiner<IT1, IT2, OT> joinFunction = 
taskContext.getStub();
+               final FlatJoinFunction<IT1, IT2, OT> joinFunction = 
taskContext.getStub();
                final Collector<OT> collector = 
taskContext.getOutputCollector();
                
                IT2 buildSideRecord = this.solutionSideRecord;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index fe1e0c1..89fbf4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator;
  * The MapTask creates an iterator over all key-value pairs of its input and 
hands that to the <code>map()</code> method
  * of the MapFunction.
  * 
- * @see GenericMap
+ * @see org.apache.flink.api.common.functions.MapFunction
  * 
  * @param <IT> The mapper's input data type.
  * @param <OT> The mapper's output data type.
  */
-public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> {
+public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
        
-       private PactTaskContext<GenericMap<IT, OT>, OT> taskContext;
+       private PactTaskContext<MapFunction<IT, OT>, OT> taskContext;
        
        private volatile boolean running;
        
        
        @Override
-       public void setup(PactTaskContext<GenericMap<IT, OT>, OT> context) {
+       public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -55,9 +55,9 @@ public class MapDriver<IT, OT> implements 
PactDriver<GenericMap<IT, OT>, OT> {
        }
 
        @Override
-       public Class<GenericMap<IT, OT>> getStubType() {
+       public Class<MapFunction<IT, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericMap<IT, OT>> clazz = (Class<GenericMap<IT, 
OT>>) (Class<?>) GenericMap.class;
+               final Class<MapFunction<IT, OT>> clazz = (Class<MapFunction<IT, 
OT>>) (Class<?>) MapFunction.class;
                return clazz;
        }
 
@@ -75,7 +75,7 @@ public class MapDriver<IT, OT> implements 
PactDriver<GenericMap<IT, OT>, OT> {
        public void run() throws Exception {
                // cache references on the stack
                final MutableObjectIterator<IT> input = 
this.taskContext.getInput(0);
-               final GenericMap<IT, OT> function = this.taskContext.getStub();
+               final MapFunction<IT, OT> function = this.taskContext.getStub();
                final Collector<OT> output = 
this.taskContext.getOutputCollector();
 
                IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index a29afa2..e205f1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -42,13 +42,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The MatchTask matches all pairs of records that share the same key and come 
from different inputs. Each pair of 
  * matching records is handed to the <code>match()</code> method of the 
JoinFunction.
  * 
- * @see GenericJoiner
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<GenericJoiner<IT1, IT2, OT>, OT> {
+public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
        
        protected static final Log LOG = LogFactory.getLog(MatchDriver.class);
        
-       protected PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext;
+       protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
taskContext;
        
        private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;          
// the iterator that does the actual matching
        
@@ -57,7 +57,7 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<GenericJoiner<IT1,
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> 
context) {
+       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -68,9 +68,9 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<GenericJoiner<IT1,
        }
 
        @Override
-       public Class<GenericJoiner<IT1, IT2, OT>> getStubType() {
+       public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericJoiner<IT1, IT2, OT>> clazz = 
(Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class;
+               final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = 
(Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class;
                return clazz;
        }
        
@@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements 
PactDriver<GenericJoiner<IT1,
 
        @Override
        public void run() throws Exception {
-               final GenericJoiner<IT1, IT2, OT> matchStub = 
this.taskContext.getStub();
+               final FlatJoinFunction<IT1, IT2, OT> matchStub = 
this.taskContext.getStub();
                final Collector<OT> collector = 
this.taskContext.getOutputCollector();
                final JoinTaskIterator<IT1, IT2, OT> matchIterator = 
this.matchIterator;
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index ffe27e6..33d8a18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.flink.api.common.functions.AbstractFunction;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -28,15 +28,15 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type.
  */
-public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> {
+public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
        
-       private PactTaskContext<AbstractFunction, T> taskContext;
+       private PactTaskContext<AbstractRichFunction, T> taskContext;
        
        private volatile boolean running;
        
        
        @Override
-       public void setup(PactTaskContext<AbstractFunction, T> context) {
+       public void setup(PactTaskContext<AbstractRichFunction, T> context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -47,7 +47,7 @@ public class NoOpDriver<T> implements 
PactDriver<AbstractFunction, T> {
        }
        
        @Override
-       public Class<AbstractFunction> getStubType() {
+       public Class<AbstractRichFunction> getStubType() {
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 4d72085..87cea30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -44,7 +44,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type consumed and produced by the combiner.
  */
-public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> 
{
+public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, 
T> {
        
        private static final Log LOG = 
LogFactory.getLog(ReduceCombineDriver.class);
 
@@ -52,13 +52,13 @@ public class ReduceCombineDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
        
        
-       private PactTaskContext<GenericReduce<T>, T> taskContext;
+       private PactTaskContext<ReduceFunction<T>, T> taskContext;
 
        private TypeSerializer<T> serializer;
 
        private TypeComparator<T> comparator;
        
-       private GenericReduce<T> reducer;
+       private ReduceFunction<T> reducer;
        
        private Collector<T> output;
        
@@ -75,7 +75,7 @@ public class ReduceCombineDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+       public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -86,9 +86,9 @@ public class ReduceCombineDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        }
 
        @Override
-       public Class<GenericReduce<T>> getStubType() {
+       public Class<ReduceFunction<T>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) 
(Class<?>) GenericReduce.class;
+               final Class<ReduceFunction<T>> clazz = 
(Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
                return clazz;
        }
 
@@ -168,7 +168,7 @@ public class ReduceCombineDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
                        final TypeSerializer<T> serializer = this.serializer;
                        final TypeComparator<T> comparator = this.comparator;
                        
-                       final GenericReduce<T> function = this.reducer;
+                       final ReduceFunction<T> function = this.reducer;
                        
                        final Collector<T> output = this.output;
                        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index a7e9305..9495cdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericReduce;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -36,13 +36,13 @@ import org.apache.flink.util.MutableObjectIterator;
  * The ReduceTask creates a iterator over all records from its input. The 
iterator returns all records grouped by their
  * key. The iterator is handed to the <code>reduce()</code> method of the 
ReduceFunction.
  * 
- * @see GenericReduce
+ * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> {
+public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
        
        private static final Log LOG = LogFactory.getLog(ReduceDriver.class);
 
-       private PactTaskContext<GenericReduce<T>, T> taskContext;
+       private PactTaskContext<ReduceFunction<T>, T> taskContext;
        
        private MutableObjectIterator<T> input;
 
@@ -55,7 +55,7 @@ public class ReduceDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GenericReduce<T>, T> context) {
+       public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -66,9 +66,9 @@ public class ReduceDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
        }
 
        @Override
-       public Class<GenericReduce<T>> getStubType() {
+       public Class<ReduceFunction<T>> getStubType() {
                @SuppressWarnings("unchecked")
-               final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) 
(Class<?>) GenericReduce.class;
+               final Class<ReduceFunction<T>> clazz = 
(Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class;
                return clazz;
        }
 
@@ -101,7 +101,7 @@ public class ReduceDriver<T> implements 
PactDriver<GenericReduce<T>, T> {
                final TypeSerializer<T> serializer = this.serializer;
                final TypeComparator<T> comparator = this.comparator;
                
-               final GenericReduce<T> function = this.taskContext.getStub();
+               final ReduceFunction<T> function = this.taskContext.getStub();
                
                final Collector<T> output = 
this.taskContext.getOutputCollector();
                

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 45fe05a..c8f217c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -24,8 +24,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -497,7 +498,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                        if (this.stub != null) {
                                try {
                                        Configuration stubConfig = 
this.config.getStubParameters();
-                                       this.stub.open(stubConfig);
+                                       FunctionUtils.openFunction(this.stub, 
stubConfig);
                                        stubOpen = true;
                                }
                                catch (Throwable t) {
@@ -510,7 +511,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
 
                        // close. We close here such that a regular close 
throwing an exception marks a task as failed.
                        if (this.running && this.stub != null) {
-                               this.stub.close();
+                               FunctionUtils.closeFunction(this.stub);
                                stubOpen = false;
                        }
 
@@ -525,15 +526,17 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                        // modify accumulators.ll;
                        if (this.stub != null) {
                                // collect the counters from the stub
-                               Map<String, Accumulator<?,?>> accumulators = 
this.stub.getRuntimeContext().getAllAccumulators();
-                               
RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, 
this.chainedTasks);
+                               if 
(FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != 
null) {
+                                       Map<String, Accumulator<?, ?>> 
accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub, 
this.runtimeUdfContext).getAllAccumulators();
+                                       
RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, 
this.chainedTasks);
+                               }
                        }
                }
                catch (Exception ex) {
                        // close the input, but do not report any exceptions, 
since we already have another root cause
                        if (stubOpen) {
                                try {
-                                       this.stub.close();
+                                       FunctionUtils.closeFunction(this.stub);
                                }
                                catch (Throwable t) {}
                        }
@@ -582,9 +585,12 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                // tasks. Type conflicts can occur here if counters with same 
name but
                // different type were used.
 
+
                for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-                       Map<String, Accumulator<?, ?>> chainedAccumulators = 
chainedTask.getStub().getRuntimeContext().getAllAccumulators();
-                       AccumulatorHelper.mergeInto(accumulators, 
chainedAccumulators);
+                       if 
(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
+                               Map<String, Accumulator<?, ?>> 
chainedAccumulators = 
FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), 
null).getAllAccumulators();
+                               AccumulatorHelper.mergeInto(accumulators, 
chainedAccumulators);
+                       }
                }
 
                // Don't report if the UDF didn't collect any accumulators
@@ -607,7 +613,9 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                // done before sending
                AccumulatorHelper.resetAndClearAccumulators(accumulators);
                for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-                       
AccumulatorHelper.resetAndClearAccumulators(chainedTask.getStub().getRuntimeContext().getAllAccumulators());
+                       if 
(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
+                               
AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(),
 null).getAllAccumulators());
+                       }
                }
        }
 
@@ -693,7 +701,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                throw new RuntimeException("The class '" + 
stub.getClass().getName() + "' is not a subclass of '" + 
                                                stubSuperClass.getName() + "' 
as is required.");
                        }
-                       stub.setRuntimeContext(this.runtimeUdfContext);
+                       FunctionUtils.setFunctionRuntimeContext(stub, 
this.runtimeUdfContext);
                        return stub;
                }
                catch (ClassCastException ccex) {
@@ -988,13 +996,13 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                                e.getMessage() == null ? "." : 
": " + e.getMessage(), e);
                                }
                                
-                               if (!(localStub instanceof GenericCombine)) {
+                               if (!(localStub instanceof 
FlatCombineFunction)) {
                                        throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
                                }
 
                                @SuppressWarnings({ "rawtypes", "unchecked" })
                                CombiningUnilateralSortMerger<?> cSorter = new 
CombiningUnilateralSortMerger(
-                                       (GenericCombine) localStub, 
getMemoryManager(), getIOManager(), this.inputIterators[inputNum], 
+                                       (FlatCombineFunction) localStub, 
getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
                                        this, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
                                        
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
                                        
this.config.getSpillingThresholdInput(inputNum));
@@ -1375,7 +1383,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
        // 
--------------------------------------------------------------------------------------------
        
        /**
-        * Opens the given stub using its {@link Function#open(Configuration)} 
method. If the open call produces
+        * Opens the given stub using its {@link 
org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. 
If the open call produces
         * an exception, a new exception with a standard error message is 
created, using the encountered exception
         * as its cause.
         * 
@@ -1386,14 +1394,14 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         */
        public static void openUserCode(Function stub, Configuration 
parameters) throws Exception {
                try {
-                       stub.open(parameters);
+                       FunctionUtils.openFunction(stub, parameters);
                } catch (Throwable t) {
                        throw new Exception("The user defined 
'open(Configuration)' method in " + stub.getClass().toString() + " caused an 
exception: " + t.getMessage(), t);
                }
        }
        
        /**
-        * Closes the given stub using its {@link Function#close()} method. If 
the close call produces
+        * Closes the given stub using its {@link 
org.apache.flink.api.common.functions.RichFunction#close()} method. If the 
close call produces
         * an exception, a new exception with a standard error message is 
created, using the encountered exception
         * as its cause.
         * 
@@ -1403,7 +1411,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         */
        public static void closeUserCode(Function stub) throws Exception {
                try {
-                       stub.close();
+                       FunctionUtils.closeFunction(stub);
                } catch (Throwable t) {
                        throw new Exception("The user defined 'close()' method 
caused an exception: " + t.getMessage(), t);
                }
@@ -1505,4 +1513,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                }
                return a;
        }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
index 3f7ad61..3dbab78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators.chaining;
 
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -61,7 +61,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
 
        // 
--------------------------------------------------------------------------------------------
 
-       public Function getStub() {
+       public RichFunction getStub() {
                return this.mapper;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index cca6838..db134a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -19,25 +19,26 @@
 
 package org.apache.flink.runtime.operators.chaining;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericFlatMap;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
 
 public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
-       private GenericFlatMap<IT, OT> mapper;
+       private FlatMapFunction<IT, OT> mapper;
 
        // 
--------------------------------------------------------------------------------------------
 
        @Override
        public void setup(AbstractInvokable parent) {
                @SuppressWarnings("unchecked")
-               final GenericFlatMap<IT, OT> mapper =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GenericFlatMap.class);
+               final FlatMapFunction<IT, OT> mapper =
+                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, FlatMapFunction.class);
                this.mapper = mapper;
-               mapper.setRuntimeContext(getUdfRuntimeContext());
+               FunctionUtils.setFunctionRuntimeContext(mapper, 
getUdfRuntimeContext());
        }
 
        @Override
@@ -54,8 +55,9 @@ public class ChainedFlatMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void cancelTask() {
                try {
-                       this.mapper.close();
-               } catch (Throwable t) {
+                       FunctionUtils.closeFunction(this.mapper);
+               }
+               catch (Throwable t) {
                }
        }
 
@@ -84,4 +86,5 @@ public class ChainedFlatMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        public void close() {
                this.outputCollector.close();
        }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index 3ae324c..a7b1048 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -20,24 +20,25 @@
 package org.apache.flink.runtime.operators.chaining;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericMap;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
 
 public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
-       private GenericMap<IT, OT> mapper;
+       private MapFunction<IT, OT> mapper;
 
        // 
--------------------------------------------------------------------------------------------
 
        @Override
        public void setup(AbstractInvokable parent) {
                @SuppressWarnings("unchecked")
-               final GenericMap<IT, OT> mapper =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GenericMap.class);
+               final MapFunction<IT, OT> mapper =
+                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, MapFunction.class);
                this.mapper = mapper;
-               mapper.setRuntimeContext(getUdfRuntimeContext());
+               FunctionUtils.setFunctionRuntimeContext(mapper, 
getUdfRuntimeContext());
        }
 
        @Override
@@ -54,7 +55,7 @@ public class ChainedMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void cancelTask() {
                try {
-                       this.mapper.close();
+                       FunctionUtils.closeFunction(this.mapper);
                } catch (Throwable t) {
                }
        }
@@ -84,4 +85,5 @@ public class ChainedMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        public void close() {
                this.outputCollector.close();
        }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
index 76d860b..3a42469 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.runtime.operators.chaining;
 
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import 
org.apache.flink.api.common.operators.base.BulkIterationBase.TerminationCriterionAggregator;
@@ -47,7 +47,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> 
extends ChainedDriver<IT,
 
        // 
--------------------------------------------------------------------------------------------
 
-       public Function getStub() {
+       public RichFunction getStub() {
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index d5ce0a7..ffac151 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -22,8 +22,9 @@ package org.apache.flink.runtime.operators.chaining;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,7 +52,7 @@ public class SynchronousChainedCombineDriver<T> extends 
ChainedDriver<T, T> {
 
        private InMemorySorter<T> sorter;
 
-       private GenericCombine<T> combiner;
+       private FlatCombineFunction<T> combiner;
 
        private TypeSerializer<T> serializer;
 
@@ -72,10 +73,10 @@ public class SynchronousChainedCombineDriver<T> extends 
ChainedDriver<T, T> {
                this.parent = parent;
 
                @SuppressWarnings("unchecked")
-               final GenericCombine<T> combiner =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GenericCombine.class);
+               final FlatCombineFunction<T> combiner =
+                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, FlatCombineFunction.class);
                this.combiner = combiner;
-               combiner.setRuntimeContext(getUdfRuntimeContext());
+               FunctionUtils.setFunctionRuntimeContext(combiner, 
getUdfRuntimeContext());
        }
 
        @Override
@@ -185,7 +186,7 @@ public class SynchronousChainedCombineDriver<T> extends 
ChainedDriver<T, T> {
                                this.comparator);
 
                        // cache references on the stack
-                       final GenericCombine<T> stub = this.combiner;
+                       final FlatCombineFunction<T> stub = this.combiner;
                        final Collector<T> output = this.outputCollector;
 
                        // run stub implementation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
index 65af050..0daf69b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -101,7 +101,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> 
implements JoinTaskIterator<
        }
 
        @Override
-       public final boolean callWithNextKey(GenericJoiner<V1, V2, O> 
matchFunction, Collector<O> collector)
+       public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> 
matchFunction, Collector<O> collector)
        throws Exception
        {
                if (this.hashJoin.nextRecord())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
index e124201..70e5afb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -100,7 +100,7 @@ public class BuildSecondHashMatchIterator<V1, V2, O> 
implements JoinTaskIterator
        }
 
        @Override
-       public boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, 
Collector<O> collector)
+       public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> 
matchFunction, Collector<O> collector)
        throws Exception
        {
                if (this.hashJoin.nextRecord())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index 06d0ac7..da8a11b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -29,7 +29,8 @@ import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericCombine;
+import org.apache.flink.api.common.functions.FlatCombineFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -71,7 +72,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
         */
        private static final Log LOG = 
LogFactory.getLog(CombiningUnilateralSortMerger.class);
 
-       private final GenericCombine<E> combineStub;    // the user code stub 
that does the combining
+       private final FlatCombineFunction<E> combineStub;       // the user 
code stub that does the combining
        
        private Configuration udfConfig;
        
@@ -101,7 +102,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
         * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
         *                                   perform the sort.
         */
-       public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, 
MemoryManager memoryManager, IOManager ioManager,
+       public CombiningUnilateralSortMerger(FlatCombineFunction<E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int maxNumFileHandles, float 
startSpillingFraction)
@@ -133,7 +134,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
         * @throws MemoryAllocationException Thrown, if not enough memory can 
be obtained from the memory manager to
         *                                   perform the sort.
         */
-       public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, 
MemoryManager memoryManager, IOManager ioManager,
+       public CombiningUnilateralSortMerger(FlatCombineFunction<E> 
combineStub, MemoryManager memoryManager, IOManager ioManager,
                        MutableObjectIterator<E> input, AbstractInvokable 
parentTask, 
                        TypeSerializerFactory<E> serializerFactory, 
TypeComparator<E> comparator,
                        double memoryFraction, int numSortBuffers, int 
maxNumFileHandles,
@@ -254,12 +255,12 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        
                        // ------------------- Spilling Phase 
------------------------
                        
-                       final GenericCombine<E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
+                       final FlatCombineFunction<E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
                        
                        // now that we are actually spilling, take the 
combiner, and open it
                        try {
-                               Configuration conf = 
CombiningUnilateralSortMerger.this.udfConfig; 
-                               combineStub.open(conf == null ? new 
Configuration() : conf);
+                               Configuration conf = 
CombiningUnilateralSortMerger.this.udfConfig;
+                               FunctionUtils.openFunction (combineStub, (conf 
== null ? new Configuration() : conf));
                        }
                        catch (Throwable t) {
                                throw new IOException("The user-defined 
combiner failed in its 'open()' method.", t);
@@ -380,7 +381,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                        
                        // close the user code
                        try {
-                               combineStub.close();
+                               FunctionUtils.closeFunction(combineStub);
                        }
                        catch (Throwable t) {
                                throw new IOException("The user-defined 
combiner failed in its 'close()' method.", t);
@@ -466,7 +467,7 @@ public class CombiningUnilateralSortMerger<E> extends 
UnilateralSortMerger<E> {
                                                                                
                                                                        
this.memManager.getPageSize());
                        
                        final WriterCollector<E> collector = new 
WriterCollector<E>(output, this.serializer);
-                       final GenericCombine<E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
+                       final FlatCombineFunction<E> combineStub = 
CombiningUnilateralSortMerger.this.combineStub;
 
                        // combine and write to disk
                        try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
index 308e333..2ed75ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java
@@ -25,7 +25,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -151,7 +151,7 @@ public class MergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O
         * @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey()
         */
        @Override
-       public boolean callWithNextKey(final GenericJoiner<T1, T2, O> 
matchFunction, final Collector<O> collector)
+       public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> 
matchFunction, final Collector<O> collector)
        throws Exception
        {
                if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) {
@@ -234,7 +234,7 @@ public class MergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O
         * @throws Exception Forwards all exceptions thrown by the stub.
         */
        private void crossFirst1withNValues(final T1 val1, final T2 firstValN,
-                       final Iterator<T2> valsN, final GenericJoiner<T1, T2, 
O> matchFunction, final Collector<O> collector)
+                       final Iterator<T2> valsN, final FlatJoinFunction<T1, 
T2, O> matchFunction, final Collector<O> collector)
        throws Exception
        {
                this.copy1 = this.serializer1.copy(val1, this.copy1);
@@ -267,7 +267,7 @@ public class MergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O
         * @throws Exception Forwards all exceptions thrown by the stub.
         */
        private void crossSecond1withNValues(T2 val1, T1 firstValN,
-                       Iterator<T1> valsN, GenericJoiner<T1, T2, O> 
matchFunction, Collector<O> collector)
+                       Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> 
matchFunction, Collector<O> collector)
        throws Exception
        {
                this.copy2 = this.serializer2.copy(val1, this.copy2);
@@ -280,7 +280,7 @@ public class MergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O
                        
                        if (valsN.hasNext()) {
                                this.copy2 = this.serializer2.copy(val1, 
this.copy2);
-                               matchFunction.join(nRec, this.copy2, collector);
+                               matchFunction.join(nRec,this.copy2,collector);
                        } else {
                                matchFunction.join(nRec, val1, collector);
                                more = false;
@@ -297,7 +297,7 @@ public class MergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O
         */
        private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals,
                        final T2 firstV2, final Iterator<T2> blockVals,
-                       final GenericJoiner<T1, T2, O> matchFunction, final 
Collector<O> collector)
+                       final FlatJoinFunction<T1, T2, O> matchFunction, final 
Collector<O> collector)
        throws Exception
        {
                // ==================================================
@@ -411,7 +411,7 @@ public class MergeMatchIterator<T1, T2, O> implements 
JoinTaskIterator<T1, T2, O
                                                // get instances of key and 
block value
                                                final T2 nextBlockVal = 
this.blockIt.next();
                                                this.copy1 = 
this.serializer1.copy(nextSpillVal, this.copy1);
-                                               matchFunction.join(this.copy1, 
nextBlockVal, collector);        
+                                               matchFunction.join(this.copy1, 
nextBlockVal, collector);
                                        }
                                        
                                        // reset block iterator

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
index 6bcb6ee..3ed647d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.util;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.util.Collector;
 
@@ -60,7 +60,7 @@ public interface JoinTaskIterator<V1, V2, O>
         * @return True, if a next key exists, false if no more keys exist.
         * @throws Exception Exceptions from the user code are forwarded.
         */
-       boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, 
Collector<O> collector) throws Exception;
+       boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, 
Collector<O> collector) throws Exception;
        
        /**
         * Aborts the matching process. This extra abort method is supplied, 
because a significant time may pass while

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index e91e100..3894233 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -23,13 +23,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.api.common.functions.GenericJoiner;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.BuildFirstCachedMatchDriver;
-import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
 import 
org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -43,7 +40,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class CachedMatchTaskTest extends DriverTestBase<GenericJoiner<Record, 
Record, Record>>
+public class CachedMatchTaskTest extends 
DriverTestBase<FlatJoinFunction<Record, Record, Record>>
 {
        private static final long HASH_MEM = 6*1024*1024;
        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 46717be..5551485 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -23,12 +23,9 @@ import java.util.Iterator;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.common.functions.GenericCoGrouper;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
 import 
org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory;
-import org.apache.flink.runtime.operators.CoGroupDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
@@ -37,7 +34,7 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class CoGroupTaskExternalITCase extends 
DriverTestBase<GenericCoGrouper<Record, Record, Record>>
+public class CoGroupTaskExternalITCase extends 
DriverTestBase<CoGroupFunction<Record, Record, Record>>
 {
        private static final long SORT_MEM = 3*1024*1024;
        
@@ -87,7 +84,7 @@ public class CoGroupTaskExternalITCase extends 
DriverTestBase<GenericCoGrouper<R
                Assert.assertEquals("Wrong result set size.", expCnt, 
this.output.getNumberOfRecords());
        }
        
-       public static final class MockCoGroupStub extends CoGroupFunction {
+       public static final class MockCoGroupStub extends 
org.apache.flink.api.java.record.functions.CoGroupFunction {
                private static final long serialVersionUID = 1L;
                
                private final Record res = new Record();

Reply via email to