[FLINK-1045][dataSet] Remove Combinable annotation.

This closes #1522


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5914e9ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5914e9ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5914e9ab

Branch: refs/heads/master
Commit: 5914e9ab3fa8e8be8fff9d7e6c10d612af7e27cf
Parents: af029e7
Author: Fabian Hueske <fhue...@apache.org>
Authored: Mon Jan 18 15:57:49 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Jan 26 11:17:24 2016 +0100

----------------------------------------------------------------------
 docs/apis/batch/dataset_transformations.md      | 111 ++++++++++---------
 .../mapred/HadoopReduceCombineFunction.java     |  22 ++--
 .../common/functions/GroupReduceFunction.java   |   5 +
 .../functions/RichGroupReduceFunction.java      |  56 +---------
 .../flink/api/java/functions/FirstReducer.java  |   2 -
 .../api/java/operators/AggregateOperator.java   |  55 ++++-----
 .../api/java/operators/DistinctOperator.java    |  16 ++-
 .../api/java/operators/GroupReduceOperator.java |  53 ++++++++-
 .../PlanUnwrappingReduceGroupOperator.java      |  45 +++++---
 ...PlanUnwrappingSortedReduceGroupOperator.java |  33 ++++--
 .../runtime/ExpressionAggregateFunction.scala   |  23 +++-
 .../java/GroupReduceCompilationTest.java        |  40 +++++--
 .../testfunctions/IdentityGroupReducer.java     |   5 +-
 .../IdentityGroupReducerCombinable.java         |  13 ++-
 .../testfunctions/Top1GroupReducer.java         |  13 ++-
 .../operators/CombineTaskExternalITCase.java    |  13 ++-
 .../runtime/operators/CombineTaskTest.java      |  34 +++---
 .../operators/ReduceTaskExternalITCase.java     |   8 +-
 .../flink/runtime/operators/ReduceTaskTest.java |  10 +-
 .../operators/chaining/ChainTaskTest.java       |  15 ++-
 .../CombiningUnilateralSortMergerITCase.java    |  11 +-
 .../scala/operators/ScalaAggregateOperator.java |  21 ++--
 .../test/accumulators/AccumulatorITCase.java    |  18 +--
 .../javaApiOperators/GroupReduceITCase.java     |  81 +++++++-------
 .../examples/KMeansSingleStepTest.java          |  18 +--
 .../examples/RelationalQueryCompilerTest.java   |  21 ++--
 .../api/scala/operators/GroupReduceITCase.scala |  59 +++++-----
 27 files changed, 469 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/docs/apis/batch/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/dataset_transformations.md 
b/docs/apis/batch/dataset_transformations.md
index 763de0f..31a1dfa 100644
--- a/docs/apis/batch/dataset_transformations.md
+++ b/docs/apis/batch/dataset_transformations.md
@@ -551,43 +551,49 @@ val output = input.groupBy(0).sortGroup(1, 
Order.ASCENDING).reduceGroup {
 #### Combinable GroupReduceFunctions
 
 In contrast to a reduce function, a group-reduce function is not
-necessarily combinable. In order to make a group-reduce function
-combinable, you need to use the `RichGroupReduceFunction` variant,
-implement (override) the `combine()` method, and annotate the
-`RichGroupReduceFunction` with the `@Combinable` annotation as shown here:
+implicitly combinable. In order to make a group-reduce function
+combinable it must implement the `GroupCombineFunction` interface.
+
+**Important**: The generic input and output types of 
+the `GroupCombineFunction` interface must be equal to the generic input type 
+of the `GroupReduceFunction` as shown in the following example:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
 ~~~java
-// Combinable GroupReduceFunction that computes two sums.
-// Note that we use the RichGroupReduceFunction because it defines the combine 
method
-@Combinable
-public class MyCombinableGroupReducer
-         extends RichGroupReduceFunction<Tuple3<String, Integer, Double>,
-                                     Tuple3<String, Integer, Double>> {
+// Combinable GroupReduceFunction that computes a sum.
+public class MyCombinableGroupReducer implements
+  GroupReduceFunction<Tuple2<String, Integer>, String>,
+  GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> 
+{
   @Override
-  public void reduce(Iterable<Tuple3<String, Integer, Double>> in,
-                     Collector<Tuple3<String, Integer, Double>> out) {
+  public void reduce(Iterable<Tuple2<String, Integer>> in,
+                     Collector<String> out) {
 
     String key = null;
-    int intSum = 0;
-    double doubleSum = 0.0;
+    int sum = 0;
 
-    for (Tuple3<String, Integer, Double> curr : in) {
+    for (Tuple2<String, Integer> curr : in) {
       key = curr.f0;
-      intSum += curr.f1;
-      doubleSum += curr.f2;
+      sum += curr.f1;
     }
-    // emit a tuple with both sums
-    out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
+    // concat key and sum and emit
+    out.collect(key + "-" + sum);
   }
 
   @Override
-  public void combine(Iterable<Tuple3<String, Integer, Double>> in,
-                      Collector<Tuple3<String, Integer, Double>> out) {
-    // in some cases combine() calls can simply be forwarded to reduce().
-    this.reduce(in, out);
+  public void combine(Iterable<Tuple2<String, Integer>> in,
+                      Collector<Tuple2<String, Integer>> out) {
+    String key = null;
+    int sum = 0;
+
+    for (Tuple2<String, Integer> curr : in) {
+      key = curr.f0;
+      sum += curr.f1;
+    }
+    // emit tuple with key and sum
+    out.collect(new Tuple2<>(key, sum)); 
   }
 }
 ~~~
@@ -598,33 +604,28 @@ public class MyCombinableGroupReducer
 ~~~scala
 
 // Combinable GroupReduceFunction that computes two sums.
-// Note that we use the RichGroupReduceFunction because it defines the combine 
method
-@Combinable
 class MyCombinableGroupReducer
-  extends RichGroupReduceFunction[(String, Int, Double), (String, Int, 
Double)] {}
-
-  def reduce(
-      in: java.lang.Iterable[(String, Int, Double)],
-      out: Collector[(String, Int, Double)]): Unit = {
-
-    val key: String = null
-    val intSum = 0
-    val doubleSum = 0.0
-
-    for (curr <- in) {
-      key = curr._1
-      intSum += curr._2
-      doubleSum += curr._3
-    }
-    // emit a tuple with both sums
-    out.collect(key, intSum, doubleSum);
+  extends GroupReduceFunction[(String, Int), String]
+  with GroupCombineFunction[(String, Int), (String, Int)]
+{
+  override def reduce(
+    in: java.lang.Iterable[(String, Int)],
+    out: Collector[String]): Unit =
+  {
+    val r: (String, Int) =
+      in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
+    // concat key and sum and emit
+    out.collect (r._1 + "-" + r._2)
   }
 
-  def combine(
-      in: java.lang.Iterable[(String, Int, Double)],
-      out: Collector[(String, Int, Double)]): Unit = {
-    // in some cases combine() calls can simply be forwarded to reduce().
-    this.reduce(in, out)
+  override def combine(
+    in: java.lang.Iterable[(String, Int)],
+    out: Collector[(String, Int)]): Unit =
+  {
+    val r: (String, Int) =
+      in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) )
+    // emit tuple with key and sum
+    out.collect(r)
   }
 }
 ~~~
@@ -635,14 +636,16 @@ class MyCombinableGroupReducer
 ~~~python
  class GroupReduce(GroupReduceFunction):
    def reduce(self, iterator, collector):
-     key, int_sum, float_sum = iterator.next()
+     key, int_sum = iterator.next()
      for value in iterator:
        int_sum += value[1]
-       float_sum += value[2]
-     collector.collect((key, int_sum, float_sum))
-   # in some cases combine() calls can simply be forwarded to reduce().
+     collector.collect(key + "-" + int_sum))
+
    def combine(self, iterator, collector):
-     return self.reduce(iterator, collector)
+     key, int_sum = iterator.next()
+     for value in iterator:
+       int_sum += value[1]
+     collector.collect((key, int_sum))
 
 data.reduce_group(GroupReduce(), combinable=True)
 ~~~
@@ -653,7 +656,7 @@ data.reduce_group(GroupReduce(), combinable=True)
 ### GroupCombine on a Grouped DataSet
 
 The GroupCombine transformation is the generalized form of the combine step in
-the Combinable GroupReduceFunction. It is generalized in the sense that it
+the combinable GroupReduceFunction. It is generalized in the sense that it
 allows combining of input type `I` to an arbitrary output type `O`. In 
contrast,
 the combine step in the GroupReduce only allows combining from input type `I` 
to
 output type `I`. This is because the reduce step in the GroupReduceFunction
@@ -872,7 +875,7 @@ val output = input.reduceGroup(new MyGroupReducer())
 
 **Note:** A GroupReduce transformation on a full DataSet cannot be done in 
parallel if the
 group-reduce function is not combinable. Therefore, this can be a very compute 
intensive operation.
-See the paragraph on "Combineable Group-Reduce Functions" above to learn how 
to implement a
+See the paragraph on "Combinable GroupReduceFunctions" above to learn how to 
implement a
 combinable group-reduce function.
 
 ### GroupCombine on a full DataSet

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index 97b9768..e164d89 100644
--- 
a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ 
b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -23,6 +23,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -44,10 +45,10 @@ import org.apache.hadoop.mapred.Reporter;
  * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a 
combinable Flink GroupReduceFunction.
  */
 @SuppressWarnings("rawtypes")
-@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
 public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> 
-                                       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
-                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
+       implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, 
Tuple2<KEYIN,VALUEIN>>,
+                               ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, 
Serializable {
 
        private static final long serialVersionUID = 1L;
 
@@ -104,10 +105,10 @@ public final class HadoopReduceCombineFunction<KEYIN, 
VALUEIN, KEYOUT, VALUEOUT>
                
                this.reporter = new HadoopDummyReporter();
                Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass((Class<KEYIN>) 
inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
-               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
-               this.combineCollector = new HadoopOutputCollector<KEYIN, 
VALUEIN>();
-               this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
+               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
+               this.valueIterator = new 
HadoopTupleUnwrappingIterator<>(keySerializer);
+               this.combineCollector = new HadoopOutputCollector<>();
+               this.reduceCollector = new HadoopOutputCollector<>();
        }
 
        @Override
@@ -131,9 +132,9 @@ public final class HadoopReduceCombineFunction<KEYIN, 
VALUEIN, KEYOUT, VALUEOUT>
                Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
                Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
 
-               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass(outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass(outValClass);
+               return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo);
        }
 
        /**
@@ -161,4 +162,5 @@ public final class HadoopReduceCombineFunction<KEYIN, 
VALUEIN, KEYOUT, VALUEOUT>
                jobConf = new JobConf();
                jobConf.readFields(in);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
index 37490c5..42b8e0c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java
@@ -38,6 +38,11 @@ import org.apache.flink.util.Collector;
  * 
  * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new 
MyGroupReduceFunction());
  * }</pre>
+ *
+ * Partial computation can significantly improve the performance of a {@link 
GroupReduceFunction}.
+ * This technique is also known as applying a Combiner.
+ * Implement the {@link GroupCombineFunction<T, T>} interface to enable 
partial computations, i.e.,
+ * a combiner for this {@link GroupReduceFunction}.
  * 
  * @param <T> Type of the elements that this function processes.
  * @param <O> The type of the elements returned by the user-defined function.

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
index 94d60a8..2c74902 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.api.common.functions;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.util.Collector;
 
@@ -31,60 +26,21 @@ import org.apache.flink.util.Collector;
  * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
+ *
+ * Partial computation can significantly improve the performance of a {@link 
RichGroupReduceFunction}.
+ * This technique is also known as applying a Combiner.
+ * Implement the {@link GroupCombineFunction<IN, IN>} interface to enable 
partial computation, i.e.,
+ * a combiner for this {@link RichGroupReduceFunction}.
  * 
  * @param <IN> Type of the elements that this function processes.
  * @param <OUT> The type of the elements returned by the user-defined function.
  */
 @Public
-public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT>, 
GroupCombineFunction<IN, IN> {
+public abstract class RichGroupReduceFunction<IN, OUT> extends 
AbstractRichFunction implements GroupReduceFunction<IN, OUT> {
        
        private static final long serialVersionUID = 1L;
 
        @Override
        public abstract void reduce(Iterable<IN> values, Collector<OUT> out) 
throws Exception;
        
-       /**
-        * The combine methods pre-reduces elements. It may be called on 
subsets of the data
-        * before the actual reduce function. This is often helpful to lower 
data volume prior
-        * to reorganizing the data in an expensive way, as might be required 
for the final
-        * reduce function.
-        * <p>
-        * This method is only ever invoked when the subclass of {@link 
RichGroupReduceFunction}
-        * adds the {@link Combinable} annotation, or if the <i>combinable</i> 
flag is set when defining
-        * the <i>reduceGroup</i> operation via
-        * 
org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean).
-        * <p>
-        * Since the reduce function will be called on the result of this 
method, it is important that this
-        * method returns the same data type as it consumes. By default, this 
method only calls the
-        * {@link #reduce(Iterable, Collector)} method. If the behavior in the 
pre-reducing is different
-        * from the final reduce function (for example because the reduce 
function changes the data type),
-        * this method must be overwritten, or the execution will fail.
-        * 
-        * @param values The iterator returning the group of values to be 
reduced.
-        * @param out The collector to emit the returned values.
-        * 
-        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
-        *                   to fail and may trigger recovery.
-        */
-       @Override
-       public void combine(Iterable<IN> values, Collector<IN> out) throws 
Exception {
-               @SuppressWarnings("unchecked")
-               Collector<OUT> c = (Collector<OUT>) out;
-               reduce(values, c);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * This annotation can be added to classes that extend {@link 
RichGroupReduceFunction}, in oder to mark
-        * them as "combinable". The system may call the {@link 
RichGroupReduceFunction#combine(Iterable, Collector)}
-        * method on such functions, to pre-reduce the data before transferring 
it over the network to
-        * the actual group reduce operation.
-        * <p>
-        * Marking combinable functions as such is in general beneficial for 
performance.
-        */
-       @Retention(RetentionPolicy.RUNTIME)
-       @Target(ElementType.TYPE)
-       @Public
-       public static @interface Combinable {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index a604cc0..2eda077 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -19,10 +19,8 @@
 package org.apache.flink.api.java.functions;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.util.Collector;
 
-@Combinable
 public class FirstReducer<T> implements GroupReduceFunction<T, T>, 
GroupCombineFunction<T, T> {
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
index d65ba0d..17dff69 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java
@@ -19,13 +19,12 @@
 package org.apache.flink.api.java.operators;
 
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -49,9 +48,9 @@ import com.google.common.base.Preconditions;
  */
 public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, 
AggregateOperator<IN>> {
        
-       private final List<AggregationFunction<?>> aggregationFunctions = new 
ArrayList<AggregationFunction<?>>(4);
+       private final List<AggregationFunction<?>> aggregationFunctions = new 
ArrayList<>(4);
        
-       private final List<Integer> fields = new ArrayList<Integer>(4);
+       private final List<Integer> fields = new ArrayList<>(4);
        
        private final Grouping<IN> grouping;
        
@@ -186,7 +185,7 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                // distinguish between grouped reduce and non-grouped reduce
                if (this.grouping == null) {
                        // non grouped aggregation
-                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getResultType());
                        GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, 
IN>> po =
                                        new GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
                        
@@ -203,7 +202,7 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                if (this.grouping.getKeys() instanceof Keys.ExpressionKeys) {
                        // grouped aggregation
                        int[] logicalKeyPositions = 
this.grouping.getKeys().computeLogicalKeyPositions();
-                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getResultType());
                        GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, 
IN>> po =
                                        new GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
                        
@@ -214,19 +213,17 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
                        
po.setCustomPartitioner(grouping.getCustomPartitioner());
                        
                        SingleInputSemanticProperties props = new 
SingleInputSemanticProperties();
-                       
-                       for (int i = 0; i < logicalKeyPositions.length; i++) {
-                               int keyField = logicalKeyPositions[i];
+
+                       for (int keyField : logicalKeyPositions) {
                                boolean keyFieldUsedInAgg = false;
-                               
-                               for (int k = 0; k < fields.length; k++) {
-                                       int aggField = fields[k];
+
+                               for (int aggField : fields) {
                                        if (keyField == aggField) {
                                                keyFieldUsedInAgg = true;
                                                break;
                                        }
                                }
-                               
+
                                if (!keyFieldUsedInAgg) {
                                        props.addForwardedField(keyField, 
keyField);
                                }
@@ -247,8 +244,10 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
        
        // 
--------------------------------------------------------------------------------------------
        
-       @Combinable
-       public static final class AggregatingUdf<T extends Tuple> extends 
RichGroupReduceFunction<T, T> {
+       public static final class AggregatingUdf<T extends Tuple>
+               extends RichGroupReduceFunction<T, T>
+               implements GroupCombineFunction<T, T> {
+
                private static final long serialVersionUID = 1L;
                
                private final int[] fieldPositions;
@@ -268,8 +267,8 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
 
                @Override
                public void open(Configuration parameters) throws Exception {
-                       for (int i = 0; i < aggFunctions.length; i++) {
-                               aggFunctions[i].initializeAggregate();
+                       for (AggregationFunction<Object> aggFunction : 
aggFunctions) {
+                               aggFunction.initializeAggregate();
                        }
                }
                
@@ -280,24 +279,28 @@ public class AggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Aggregate
 
                        // aggregators are initialized from before
                        
-                       T current = null;
-                       final Iterator<T> values = records.iterator();
-                       while (values.hasNext()) {
-                               current = values.next();
-                               
+                       T outT = null;
+                       for (T record : records) {
+                               outT = record;
+
                                for (int i = 0; i < fieldPositions.length; i++) 
{
-                                               Object val = 
current.getFieldNotNull(fieldPositions[i]);
-                                               aggFunctions[i].aggregate(val);
+                                       Object val = 
record.getFieldNotNull(fieldPositions[i]);
+                                       aggFunctions[i].aggregate(val);
                                }
                        }
                        
                        for (int i = 0; i < fieldPositions.length; i++) {
                                Object aggVal = aggFunctions[i].getAggregate();
-                               current.setField(aggVal, fieldPositions[i]);
+                               outT.setField(aggVal, fieldPositions[i]);
                                aggFunctions[i].initializeAggregate();
                        }
                        
-                       out.collect(current);
+                       out.collect(outT);
+               }
+
+               @Override
+               public void combine(Iterable<T> records, Collector<T> out) {
+                       reduce(records, out);
                }
                
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index d85c9a6..5102c80 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -59,7 +59,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
        @Override
        protected 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> 
translateToDataFlow(Operator<T> input) {
 
-               final RichGroupReduceFunction<T, T> function = new 
DistinctFunction<>();
+               final GroupReduceFunction<T, T> function = new 
DistinctFunction<>();
 
                String name = getName() != null ? getName() : "Distinct at " + 
distinctLocationName;
 
@@ -68,7 +68,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                        int[] logicalKeyPositions = 
keys.computeLogicalKeyPositions();
                        UnaryOperatorInformation<T, T> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getResultType());
                        GroupReduceOperatorBase<T, T, GroupReduceFunction<T, 
T>> po =
-                                       new GroupReduceOperatorBase<T, T, 
GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name);
+                                       new GroupReduceOperatorBase<>(function, 
operatorInfo, logicalKeyPositions, name);
 
                        po.setCombinable(true);
                        po.setInput(input);
@@ -108,7 +108,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
 
        private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, 
K> translateSelectorFunctionDistinct(
                        SelectorFunctionKeys<IN, ?> rawKeys,
-                       RichGroupReduceFunction<IN, OUT> function,
+                       GroupReduceFunction<IN, OUT> function,
                        TypeInformation<OUT> outputType,
                        String name,
                        Operator<IN> input)
@@ -126,8 +126,7 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                return reducer;
        }
        
-       @RichGroupReduceFunction.Combinable
-       public static final class DistinctFunction<T> extends 
RichGroupReduceFunction<T, T> {
+       public static final class DistinctFunction<T> implements 
GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
 
                private static final long serialVersionUID = 1L;
 
@@ -135,5 +134,10 @@ public class DistinctOperator<T> extends 
SingleInputOperator<T, T, DistinctOpera
                public void reduce(Iterable<T> values, Collector<T> out) {
                        out.collect(values.iterator().next());
                }
+
+               @Override
+               public void combine(Iterable<T> values, Collector<T> out) {
+                       out.collect(values.iterator().next());
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 5225b33..91f2efd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
@@ -35,6 +34,11 @@ import 
org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduc
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.DataSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 
 /**
  * This operator represents the application of a "reduceGroup" function on a 
data set, and the
@@ -45,6 +49,8 @@ import org.apache.flink.api.java.DataSet;
  */
 public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, 
OUT, GroupReduceOperator<IN, OUT>> {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(GroupReduceOperator.class);
+
        private final GroupReduceFunction<IN, OUT> function;
 
        private final Grouping<IN> grouper;
@@ -88,9 +94,48 @@ public class GroupReduceOperator<IN, OUT> extends 
SingleInputUdfOperator<IN, OUT
        }
 
        private void checkCombinability() {
-               if (function instanceof GroupCombineFunction &&
-                               
function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != 
null) {
-                       this.combinable = true;
+               if (function instanceof GroupCombineFunction) {
+
+                       // check if the generic types of GroupCombineFunction 
and GroupReduceFunction match, i.e.,
+                       //   GroupCombineFunction<IN, IN> and 
GroupReduceFunction<IN, OUT>.
+                       // This is a best effort check. If the check cannot be 
done, we might fail at runtime.
+                       Type[] reduceTypes = null;
+                       Type[] combineTypes = null;
+
+                       Type[] genInterfaces = 
function.getClass().getGenericInterfaces();
+                       for (Type genInterface : genInterfaces) {
+                               if (genInterface instanceof ParameterizedType) {
+                                       // get parameters of GroupReduceFunction
+                                       if (((ParameterizedType) 
genInterface).getRawType().equals(GroupReduceFunction.class)) {
+                                               reduceTypes = 
((ParameterizedType) genInterface).getActualTypeArguments();
+                                       // get parameters of 
GroupCombineFunction
+                                       } else if (((ParameterizedType) 
genInterface).getRawType().equals(GroupCombineFunction.class)) {
+                                               combineTypes = 
((ParameterizedType) genInterface).getActualTypeArguments();
+                                       }
+                               }
+                       }
+
+                       if (reduceTypes != null && reduceTypes.length == 2 &&
+                               combineTypes != null && combineTypes.length == 
2) {
+
+                               if (reduceTypes[0].equals(combineTypes[0]) && 
reduceTypes[0].equals(combineTypes[1])) {
+                                       this.combinable = true;
+                               } else {
+                                       LOG.warn("GroupCombineFunction cannot 
be used as combiner for GroupReduceFunction. " +
+                                               "Generic types are 
incompatible.");
+                                       this.combinable = false;
+                               }
+                       }
+                       else if (reduceTypes == null || reduceTypes.length != 
2) {
+                               LOG.warn("Cannot check generic types of 
GroupReduceFunction. " +
+                                       "Enabling combiner but combine function 
might fail at runtime.");
+                               this.combinable = true;
+                       }
+                       else {
+                               LOG.warn("Cannot check generic types of 
GroupCombineFunction. " +
+                                       "Enabling combiner but combine function 
might fail at runtime.");
+                               this.combinable = true;
+                       }
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index e01af50..72b79a4 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
@@ -34,31 +33,43 @@ import org.apache.flink.util.Collector;
  */
 public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends 
GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, 
IN>,OUT>> {
 
-       public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> 
udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
-                       TypeInformation<OUT> outType, TypeInformation<Tuple2<K, 
IN>> typeInfoWithKey, boolean combinable)
+       public PlanUnwrappingReduceGroupOperator(
+               GroupReduceFunction<IN, OUT> udf,
+               Keys.SelectorFunctionKeys<IN, K> key,
+               String name,
+               TypeInformation<OUT> outType,
+               TypeInformation<Tuple2<K, IN>> typeInfoWithKey,
+               boolean combinable)
        {
-               super(combinable ? new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, 
K>((RichGroupReduceFunction<IN, OUT>) udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
-                               new UnaryOperatorInformation<Tuple2<K, IN>, 
OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
+               super(
+                       combinable ?
+                               new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>(udf) :
+                               new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
+                       new UnaryOperatorInformation<>(typeInfoWithKey, 
outType), key.computeLogicalKeyPositions(), name);
                
                super.setCombinable(combinable);
        }
        
        // 
--------------------------------------------------------------------------------------------
        
-       @RichGroupReduceFunction.Combinable
-       public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends 
WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+       public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends 
WrappingFunction<GroupReduceFunction<IN, OUT>>
                implements GroupReduceFunction<Tuple2<K, IN>, OUT>, 
GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
        {
 
                private static final long serialVersionUID = 1L;
                
                private TupleUnwrappingIterator<IN, K> iter;
-               private TupleWrappingCollector<IN, K> coll; 
-               
-               private 
TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> 
wrapped) {
+               private TupleWrappingCollector<IN, K> coll;
+
+               private 
TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> 
wrapped) {
                        super(wrapped);
-                       this.iter = new TupleUnwrappingIterator<IN, K>();
-                       this.coll = new TupleWrappingCollector<IN, 
K>(this.iter);
+
+                       
if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+                               throw new IllegalArgumentException("Wrapped 
reduce function does not implement the GroupCombineFunction interface.");
+                       }
+
+                       this.iter = new TupleUnwrappingIterator<>();
+                       this.coll = new TupleWrappingCollector<>(this.iter);
                }
 
 
@@ -68,11 +79,13 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
                        this.wrappedFunction.reduce(iter, out);
                }
 
+               @SuppressWarnings("unchecked")
                @Override
                public void combine(Iterable<Tuple2<K, IN>> values, 
Collector<Tuple2<K, IN>> out) throws Exception {
-                               iter.set(values.iterator());
-                               coll.set(out);
-                               this.wrappedFunction.combine(iter, coll);
+
+                       iter.set(values.iterator());
+                       coll.set(out);
+                       ((GroupCombineFunction<IN, 
IN>)this.wrappedFunction).combine(iter, coll);
                }
                
                @Override
@@ -91,7 +104,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> 
extends GroupReduceOp
                
                private 
TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
                        super(wrapped);
-                       this.iter = new TupleUnwrappingIterator<IN, K>();
+                       this.iter = new TupleUnwrappingIterator<>();
                }
        
        

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index e4d41f4..278d706 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.operators.Keys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
@@ -34,18 +33,26 @@ import org.apache.flink.util.Collector;
  */
 public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends 
GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, 
K2, IN>,OUT>> {
 
-       public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, 
OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, 
Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
-                                                                               
        TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> 
typeInfoWithKey, boolean combinable)
+       public PlanUnwrappingSortedReduceGroupOperator(
+               GroupReduceFunction<IN, OUT> udf,
+               Keys.SelectorFunctionKeys<IN, K1> groupingKey,
+               Keys.SelectorFunctionKeys<IN, K2> sortingKey,
+               String name,
+               TypeInformation<OUT> outType,
+               TypeInformation<Tuple3<K1, K2, IN>>
+               typeInfoWithKey, boolean combinable)
        {
-               super(combinable ? new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
-                       new UnaryOperatorInformation<Tuple3<K1, K2, IN>, 
OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
+               super(
+                       combinable ?
+                               new 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) :
+                               new 
TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+                       new UnaryOperatorInformation<>(typeInfoWithKey, 
outType), groupingKey.computeLogicalKeyPositions(), name);
 
                super.setCombinable(combinable);
        }
 
        // 
--------------------------------------------------------------------------------------------
 
-       @RichGroupReduceFunction.Combinable
        public static final class 
TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends 
WrappingFunction<GroupReduceFunction<IN, OUT>>
                implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, 
GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
        {
@@ -57,8 +64,13 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, 
OUT, K1, K2> extends Gr
 
                private 
TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> 
wrapped) {
                        super(wrapped);
-                       this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
-                       this.coll = new Tuple3WrappingCollector<IN, K1, 
K2>(this.iter);
+
+                       
if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+                               throw new IllegalArgumentException("Wrapped 
reduce function does not implement the GroupCombineFunction interface.");
+                       }
+
+                       this.iter = new Tuple3UnwrappingIterator<>();
+                       this.coll = new Tuple3WrappingCollector<>(this.iter);
                }
 
 
@@ -68,11 +80,12 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, 
OUT, K1, K2> extends Gr
                        this.wrappedFunction.reduce(iter, out);
                }
 
+               @SuppressWarnings("unchecked")
                @Override
                public void combine(Iterable<Tuple3<K1, K2, IN>> values, 
Collector<Tuple3<K1, K2, IN>> out) throws Exception {
                        iter.set(values.iterator());
                        coll.set(out);
-                       
((GroupCombineFunction)this.wrappedFunction).combine(iter, coll);
+                       ((GroupCombineFunction<IN, 
IN>)this.wrappedFunction).combine(iter, coll);
                }
 
                @Override
@@ -91,7 +104,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, 
OUT, K1, K2> extends Gr
 
                private 
TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
                        super(wrapped);
-                       this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
+                       this.iter = new Tuple3UnwrappingIterator<>();
                }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
index 932f9df..38afc21 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
@@ -18,17 +18,17 @@
 package org.apache.flink.api.table.runtime
 
 import org.apache.flink.api.table.Row
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+import org.apache.flink.api.common.functions.{GroupReduceFunction, 
GroupCombineFunction, RichGroupReduceFunction}
 import org.apache.flink.api.java.aggregation.AggregationFunction
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 
-@Combinable
 class ExpressionAggregateFunction(
     private val fieldPositions: Seq[Int],
     private val functions: Seq[AggregationFunction[Any]])
-  extends RichGroupReduceFunction[Row, Row] {
+  extends RichGroupReduceFunction[Row, Row]
+  with GroupCombineFunction[Row, Row]
+{
 
   override def open(conf: Configuration): Unit = {
     var i = 0
@@ -69,10 +69,17 @@ class ExpressionAggregateFunction(
     out.collect(current)
   }
 
+  override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
+    reduce(in, out)
+  }
+
 }
 
-@Combinable
-class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, 
Row] {
+
+class NoExpressionAggregateFunction()
+  extends GroupReduceFunction[Row, Row]
+  with GroupCombineFunction[Row, Row]
+{
 
   override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
 
@@ -86,4 +93,8 @@ class NoExpressionAggregateFunction() extends 
RichGroupReduceFunction[Row, Row]
     out.collect(first)
   }
 
+  override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
+    reduce(in, out)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index 6b49dd4..26a2faf 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.optimizer.java;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -92,11 +94,9 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        env.setParallelism(8);
                        
                        DataSet<Long> data = env.generateSequence(1, 
8000000).name("source");
-                       
-                       GroupReduceOperator<Long, Long> reduced = 
data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
-                               public void reduce(Iterable<Long> values, 
Collector<Long> out) {}
-                       }).name("reducer");
-                       
+
+                       GroupReduceOperator<Long, Long> reduced = 
data.reduceGroup(new CombineReducer2()).name("reducer");
+
                        reduced.setCombinable(true);
                        reduced.output(new 
DiscardingOutputFormat<Long>()).name("sink");
                        
@@ -195,9 +195,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        
                        GroupReduceOperator<Tuple2<String, Double>, 
Tuple2<String, Double>> reduced = data
                                        .groupBy(1)
-                                       .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
-                       }).name("reducer");
+                                       .reduceGroup(new 
CombineReducer()).name("reducer");
                        
                        reduced.setCombinable(true);
                        reduced.output(new 
DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
@@ -313,9 +311,7 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                                .groupBy(new KeySelector<Tuple2<String,Double>, 
String>() { 
                                        public String getKey(Tuple2<String, 
Double> value) { return value.f0; }
                                })
-                               .reduceGroup(new 
RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
-                               public void reduce(Iterable<Tuple2<String, 
Double>> values, Collector<Tuple2<String, Double>> out) {}
-                       }).name("reducer");
+                               .reduceGroup(new 
CombineReducer()).name("reducer");
                        
                        reduced.setCombinable(true);
                        reduced.output(new 
DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
@@ -366,4 +362,26 @@ public class GroupReduceCompilationTest extends 
CompilerTestBase implements java
                        fail(e.getClass().getSimpleName() + " in test: " + 
e.getMessage());
                }
        }
+
+       public static class CombineReducer implements
+               GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, 
Double>>,
+               GroupCombineFunction<Tuple2<String, Double>, Tuple2<String, 
Double>> {
+
+               @Override
+               public void reduce(Iterable<Tuple2<String, Double>> values, 
Collector<Tuple2<String, Double>> out) {}
+
+               @Override
+               public void combine(Iterable<Tuple2<String, Double>> values, 
Collector<Tuple2<String, Double>> out) {}
+       }
+
+       public static class CombineReducer2 implements
+               GroupReduceFunction<Long, Long>,
+               GroupCombineFunction<Long, Long> {
+
+               @Override
+               public void reduce(Iterable<Long> values, Collector<Long> out) 
{}
+
+               @Override
+               public void combine(Iterable<Long> values, Collector<Long> out) 
{}
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
index da4ef17..0336771 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java
@@ -19,11 +19,10 @@
 package org.apache.flink.optimizer.testfunctions;
 
 
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.util.Collector;
 
-public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> {
+public class IdentityGroupReducer<T> implements GroupReduceFunction<T, T> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
index ce24bb6..e1dbdd5 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java
@@ -19,12 +19,12 @@
 package org.apache.flink.optimizer.testfunctions;
 
 
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.util.Collector;
 
-@Combinable
-public class IdentityGroupReducerCombinable<T> extends 
RichGroupReduceFunction<T, T> {
+public class IdentityGroupReducerCombinable<T>
+       implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
 
        private static final long serialVersionUID = 1L;
 
@@ -34,4 +34,9 @@ public class IdentityGroupReducerCombinable<T> extends 
RichGroupReduceFunction<T
                        out.collect(next);
                }
        }
+
+       @Override
+       public void combine(Iterable<T> values, Collector<T> out) {
+               reduce(values, out);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
index 48d13ca..7213a25 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.optimizer.testfunctions;
 
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.util.Collector;
 
 
-@Combinable
-public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> {
+public class Top1GroupReducer<T>
+       implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
 
        private static final long serialVersionUID = 1L;
 
@@ -32,4 +32,9 @@ public class Top1GroupReducer<T> extends 
RichGroupReduceFunction<T, T> {
        public void reduce(Iterable<T> values, Collector<T> out) {
                out.collect(values.iterator().next());
        }
+
+       @Override
+       public void combine(Iterable<T> values, Collector<T> out) {
+               out.collect(values.iterator().next());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index e162d7d..1699f79 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -22,11 +22,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -166,8 +167,9 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
        // 
------------------------------------------------------------------------
        // 
------------------------------------------------------------------------
 
-       @Combinable
-       public static class MockCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+       public static class MockCombiningReduceStub implements
+               GroupReduceFunction<Record, Record>, 
GroupCombineFunction<Record, Record>
+       {
                private static final long serialVersionUID = 1L;
 
                private final IntValue theInteger = new IntValue();
@@ -194,8 +196,9 @@ public class CombineTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFun
                }
        }
 
-       @Combinable
-       public static final class MockFailingCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+       public static final class MockFailingCombiningReduceStub implements
+               GroupReduceFunction<Record, Record>, 
GroupCombineFunction<Record, Record>
+       {
                private static final long serialVersionUID = 1L;
 
                private int cnt = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index b6ce2d4..f0d9a8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,15 +53,15 @@ public class CombineTaskTest
 
        private final double combine_frac;
        
-       private final ArrayList<Tuple2<Integer, Integer>> outList = new 
ArrayList<Tuple2<Integer, Integer>>();
+       private final ArrayList<Tuple2<Integer, Integer>> outList = new 
ArrayList<>();
 
        @SuppressWarnings("unchecked")
-       private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new 
TupleSerializer<Tuple2<Integer, Integer>>(
+       private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new 
TupleSerializer<>(
                        (Class<Tuple2<Integer, Integer>>) (Class<?>) 
Tuple2.class,
                        new TypeSerializer<?>[] { IntSerializer.INSTANCE, 
IntSerializer.INSTANCE });
        
        
-       private final TypeComparator<Tuple2<Integer, Integer>> comparator = new 
TupleComparator<Tuple2<Integer, Integer>>(
+       private final TypeComparator<Tuple2<Integer, Integer>> comparator = new 
TupleComparator<>(
                        new int[]{0},
                        new TypeComparator<?>[] { new IntComparator(true) },
                        new TypeSerializer<?>[] { IntSerializer.INSTANCE });
@@ -88,7 +90,7 @@ public class CombineTaskTest
                        getTaskConfig().setFilehandlesDriver(2);
                        
                        final GroupReduceCombineDriver<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testTask =
-                                       new 
GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+                                       new GroupReduceCombineDriver<>();
                        
                        testDriver(testTask, MockCombiningReduceStub.class);
                        
@@ -127,7 +129,7 @@ public class CombineTaskTest
                        getTaskConfig().setFilehandlesDriver(2);
                        
                        final GroupReduceCombineDriver<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testTask = 
-                                       new 
GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+                                       new GroupReduceCombineDriver<>();
                        
                        try {
                                testDriver(testTask, 
MockFailingCombiningReduceStub.class);
@@ -147,7 +149,7 @@ public class CombineTaskTest
        public void testCancelCombineTaskSorting()  {
                try {
                        MutableObjectIterator<Tuple2<Integer, Integer>> 
slowInfiniteInput =
-                                       new DelayingIterator<Tuple2<Integer, 
Integer>>(new InfiniteIntTupleIterator(), 1);
+                                       new DelayingIterator<>(new 
InfiniteIntTupleIterator(), 1);
                        
                        setInput(slowInfiniteInput, serializer);
                        addDriverComparator(this.comparator);
@@ -159,7 +161,7 @@ public class CombineTaskTest
                        getTaskConfig().setFilehandlesDriver(2);
                        
                        final GroupReduceCombineDriver<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testTask = 
-                                       new 
GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+                                       new GroupReduceCombineDriver<>();
                        
                        Thread taskRunner = new Thread() {
                                @Override
@@ -200,9 +202,9 @@ public class CombineTaskTest
        //  Test Combiners
        // 
------------------------------------------------------------------------
        
-       @RichGroupReduceFunction.Combinable
-       public static class MockCombiningReduceStub extends 
-                       RichGroupReduceFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>
+       public static class MockCombiningReduceStub implements
+               GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>>,
+               GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>>
        {
                private static final long serialVersionUID = 1L;
 
@@ -216,7 +218,7 @@ public class CombineTaskTest
                                sum += next.f1;
                        }
                        
-                       out.collect(new Tuple2<Integer, Integer>(key, sum));
+                       out.collect(new Tuple2<>(key, sum));
                }
                
                @Override
@@ -225,9 +227,9 @@ public class CombineTaskTest
                }
        }
        
-       @RichGroupReduceFunction.Combinable
-       public static final class MockFailingCombiningReduceStub extends 
-                       RichGroupReduceFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>
+       public static final class MockFailingCombiningReduceStub implements
+               GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>>,
+               GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, 
Integer>>
        {
                private static final long serialVersionUID = 1L;
                
@@ -244,7 +246,7 @@ public class CombineTaskTest
                        }
                        
                        int resultValue = sum - key;
-                       out.collect(new Tuple2<Integer, Integer>(key, 
resultValue));
+                       out.collect(new Tuple2<>(key, resultValue));
                }
                
                @Override
@@ -262,7 +264,7 @@ public class CombineTaskTest
                        }
 
                        int resultValue = sum - key;
-                       out.collect(new Tuple2<Integer, Integer>(key, 
resultValue));
+                       out.collect(new Tuple2<>(key, resultValue));
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index a00aea3..6ebafee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -22,11 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
@@ -232,8 +233,9 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                }
        }
        
-       @Combinable
-       public static class MockCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+       public static class MockCombiningReduceStub implements
+               GroupReduceFunction<Record, Record>, 
GroupCombineFunction<Record, Record> {
+
                private static final long serialVersionUID = 1L;
 
                private final IntValue key = new IntValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 531d8ba..904b81e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -23,11 +23,12 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
@@ -289,9 +290,10 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                        out.collect(element);
                }
        }
-       
-       @Combinable
-       public static class MockCombiningReduceStub extends 
RichGroupReduceFunction<Record, Record> {
+
+       public static class MockCombiningReduceStub
+               implements GroupReduceFunction<Record, Record>, 
GroupCombineFunction<Record, Record> {
+
                private static final long serialVersionUID = 1L;
                
                private final IntValue key = new IntValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 4d8e0de..c3c23de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
 import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
@@ -32,7 +34,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.FlatMapDriver;
 import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub;
-import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
+import 
org.apache.flink.runtime.operators.ReduceTaskTest.MockCombiningReduceStub;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -94,7 +96,7 @@ public class ChainTaskTest extends TaskTestBase {
                                
combineConfig.setRelativeMemoryDriver(memoryFraction);
                                
                                // udf
-                               combineConfig.setStubWrapper(new 
UserCodeClassWrapper<>(MockReduceStub.class));
+                               combineConfig.setStubWrapper(new 
UserCodeClassWrapper<>(MockCombiningReduceStub.class));
                                
                                
getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, 
combineConfig, "combine");
                        }
@@ -184,7 +186,9 @@ public class ChainTaskTest extends TaskTestBase {
                }
        }
        
-       public static final class MockFailingCombineStub extends 
RichGroupReduceFunction<Record, Record> {
+       public static final class MockFailingCombineStub implements
+               GroupReduceFunction<Record, Record>,
+               GroupCombineFunction<Record, Record> {
                private static final long serialVersionUID = 1L;
                
                private int cnt = 0;
@@ -199,5 +203,10 @@ public class ChainTaskTest extends TaskTestBase {
                                out.collect(r);
                        }
                }
+
+               @Override
+               public void combine(Iterable<Record> values, Collector<Record> 
out) throws Exception {
+                       reduce(values, out);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 7b7b940..38e3db8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -23,6 +23,7 @@ import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -252,7 +253,10 @@ public class CombiningUnilateralSortMergerITCase {
 
        // 
--------------------------------------------------------------------------------------------
        
-       public static class TestCountCombiner extends 
RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+       public static class TestCountCombiner
+               extends RichGroupReduceFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>
+               implements GroupCombineFunction<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>>
+       {
                private static final long serialVersionUID = 1L;
                
                private Integer count = 0;
@@ -290,7 +294,10 @@ public class CombiningUnilateralSortMergerITCase {
                }
        }
 
-       public static class TestCountCombiner2 extends 
RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
+       public static class TestCountCombiner2
+               extends RichGroupReduceFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+               implements GroupCombineFunction<Tuple2<Integer, String>, 
Tuple2<Integer, String>>
+       {
                private static final long serialVersionUID = 1L;
                
                public volatile boolean opened = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index ab32ca6..42f0e70 100644
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ 
b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -52,9 +52,9 @@ import scala.Product;
  */
 public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, 
ScalaAggregateOperator<IN>> {
 
-       private final List<AggregationFunction<?>> aggregationFunctions = new 
ArrayList<AggregationFunction<?>>(4);
+       private final List<AggregationFunction<?>> aggregationFunctions = new 
ArrayList<>(4);
 
-       private final List<Integer> fields = new ArrayList<Integer>(4);
+       private final List<Integer> fields = new ArrayList<>(4);
 
        private final Grouping<IN> grouping;
 
@@ -170,7 +170,7 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
                // distinguish between grouped reduce and non-grouped reduce
                if (this.grouping == null) {
                        // non grouped aggregation
-                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getResultType());
                        GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, 
IN>> po =
                                        new GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name);
 
@@ -187,7 +187,7 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
                if (this.grouping.getKeys() instanceof Keys.ExpressionKeys) {
                        // grouped aggregation
                        int[] logicalKeyPositions = 
this.grouping.getKeys().computeLogicalKeyPositions();
-                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<IN, IN>(getInputType(), getResultType());
+                       UnaryOperatorInformation<IN, IN> operatorInfo = new 
UnaryOperatorInformation<>(getInputType(), getResultType());
                        GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, 
IN>> po =
                                        new GroupReduceOperatorBase<IN, IN, 
GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name);
 
@@ -230,8 +230,10 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
 
        // 
--------------------------------------------------------------------------------------------
 
-       @Combinable
-       public static final class AggregatingUdf<T extends Product> extends 
RichGroupReduceFunction<T, T> {
+       public static final class AggregatingUdf<T extends Product>
+               extends RichGroupReduceFunction<T, T>
+               implements GroupCombineFunction<T, T>
+       {
                private static final long serialVersionUID = 1L;
 
                private final int[] fieldPositions;
@@ -294,5 +296,10 @@ public class ScalaAggregateOperator<IN> extends 
SingleInputOperator<IN, IN, Scal
                        out.collect(result);
                }
 
+               @Override
+               public void combine(Iterable<T> records, Collector<T> out) {
+                       reduce(records, out);
+               }
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index d7d45fe..b4015e5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.DataSet;
@@ -74,7 +75,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
                System.out.println("Accumulator results:");
                JobExecutionResult res = this.result;
                
System.out.println(AccumulatorHelper.getResultsFormated(res.getAllAccumulatorResults()));
-               
+
                Assert.assertEquals(Integer.valueOf(3), (Integer) 
res.getAccumulatorResult("num-lines"));
 
                Assert.assertEquals(Double.valueOf(getParallelism()), 
(Double)res.getAccumulatorResult("open-close-counter"));
@@ -128,7 +129,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
                        
getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter);
 
                        // Add custom counter
-                       this.distinctWords = new SetAccumulator<StringValue>();
+                       this.distinctWords = new SetAccumulator<>();
                        
this.getRuntimeContext().addAccumulator("distinct-words", distinctWords);
 
                        // Create counter and test increment
@@ -164,7 +165,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
                        
                        for (String token : value.toLowerCase().split("\\W+")) {
                                distinctWords.add(new StringValue(token));
-                               out.collect(new Tuple2<String, Integer>(token, 
1));
+                               out.collect(new Tuple2<>(token, 1));
                                ++ wordsPerLine;
                        }
                        wordsPerLineDistribution.add(wordsPerLine);
@@ -179,7 +180,10 @@ public class AccumulatorITCase extends JavaProgramTestBase 
{
        }
 
        
-       public static class CountWords extends 
RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+       public static class CountWords
+               extends RichGroupReduceFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
+               implements GroupCombineFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
+       {
                
                private IntCounter reduceCalls;
                private IntCounter combineCalls;
@@ -210,7 +214,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
                                key = e.f0;
                                sum += e.f1;
                        }
-                       out.collect(new Tuple2<String, Integer>(key, sum));
+                       out.collect(new Tuple2<>(key, sum));
                }
        }
        
@@ -221,7 +225,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 
                private static final long serialVersionUID = 1L;
 
-               private HashSet<T> set = new HashSet<T>();
+               private HashSet<T> set = new HashSet<>();
 
                @Override
                public void add(T value) {
@@ -246,7 +250,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 
                @Override
                public Accumulator<T, HashSet<T>> clone() {
-                       SetAccumulator<T> result = new SetAccumulator<T>();
+                       SetAccumulator<T> result = new SetAccumulator<>();
                        result.set.addAll(set);
                        return result;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 95a8cb0..075e60c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.test.javaApiOperators;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -465,7 +466,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
 
                @Override
                public Tuple2<Integer, Long> 
getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
-                       return new Tuple2<Integer, Long>(t.f0, t.f4);
+                       return new Tuple2<>(t.f0, t.f4);
                }
        }
 
@@ -525,7 +526,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                c++; // haha
                                n = 
v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal;
                        }
-                       out.collect(new Tuple2<String, Integer>(n,c));
+                       out.collect(new Tuple2<>(n,c));
                }
        }
 
@@ -770,7 +771,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
 
                @Override
                public Tuple2<Integer, Integer> getKey(CustomType value) throws 
Exception {
-                       return new Tuple2<Integer, Integer>(value.myInt, 
value.myInt);
+                       return new Tuple2<>(value.myInt, value.myInt);
                }
        }
 
@@ -894,7 +895,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
 
                @Override
                public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, 
Integer, String, Long> in) {
-                       return new Tuple2<Long, Integer>(in.f4, in.f2);
+                       return new Tuple2<>(in.f4, in.f2);
                }
        }
 
@@ -1086,7 +1087,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                                                                
        k = v.f0;
                                                                                
        s += v.f1;
                                                                                
}
-                                                                               
out.collect(new Tuple2<Integer, Long>(k, s));
+                                                                               
out.collect(new Tuple2<>(k, s));
                                                                        }
                                                                }
                                                                );
@@ -1131,7 +1132,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
        @Test
        public void testJodatimeDateTimeWithKryo() throws Exception {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new 
Tuple2<Integer, DateTime>(1, DateTime.now()));
+               DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new 
Tuple2<>(1, DateTime.now()));
                DataSet<Tuple2<Integer, DateTime>> reduceDs = 
ds.groupBy("f1").sum(0).project(0);
 
                List<Tuple2<Integer, DateTime>> result = reduceDs.collect();
@@ -1150,9 +1151,9 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
        public void testDateNullException() throws Exception {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<Tuple2<Integer, Date>> in = env.fromElements(new 
Tuple2<Integer, Date>(0, new Date(1230000000)),
+               DataSet<Tuple2<Integer, Date>> in = env.fromElements(new 
Tuple2<>(0, new Date(1230000000)),
                                new Tuple2<Integer, Date>(1, null),
-                               new Tuple2<Integer, Date>(2, new 
Date(1230000000))
+                               new Tuple2<>(2, new Date(1230000000))
                );
 
                DataSet<String> r = in.groupBy(0).reduceGroup(new 
GroupReduceFunction<Tuple2<Integer, Date>, String>() {
@@ -1207,14 +1208,14 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                @Override
                public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple2<Integer, Long>> out) {
                        int i = 0;
-                       long l = 0l;
+                       long l = 0L;
 
                        for (Tuple3<Integer, Long, String> t : values) {
                                i += t.f0;
                                l = t.f1;
                        }
 
-                       out.collect(new Tuple2<Integer, Long>(i, l));
+                       out.collect(new Tuple2<>(i, l));
 
                }
        }
@@ -1239,7 +1240,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                concat.setLength(concat.length() - 1);
                        }
 
-                       out.collect(new Tuple3<Integer, Long, String>(sum, key, 
concat.toString()));
+                       out.collect(new Tuple3<>(sum, key, concat.toString()));
                }
        }
 
@@ -1252,8 +1253,8 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                Collector<Tuple5<Integer, Long, Integer, 
String, Long>> out)
                {
                        int i = 0;
-                       long l = 0l;
-                       long l2 = 0l;
+                       long l = 0L;
+                       long l2 = 0L;
 
                        for ( Tuple5<Integer, Long, Integer, String, Long> t : 
values ) {
                                i = t.f0;
@@ -1261,7 +1262,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                l2 = t.f4;
                        }
 
-                       out.collect(new Tuple5<Integer, Long, Integer, String, 
Long>(i, l, 0, "P-)", l2));
+                       out.collect(new Tuple5<>(i, l, 0, "P-)", l2));
                }
        }
 
@@ -1274,8 +1275,8 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                Collector<Tuple5<Integer, Long, Integer, 
String, Long>> out)
                {
                        int i = 0;
-                       long l = 0l;
-                       long l2 = 0l;
+                       long l = 0L;
+                       long l2 = 0L;
                        StringBuilder concat = new StringBuilder();
 
                        for ( Tuple5<Integer, Long, Integer, String, Long> t : 
values ) {
@@ -1288,7 +1289,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                concat.setLength(concat.length() - 1);
                        }
 
-                       out.collect(new Tuple5<Integer, Long, Integer, String, 
Long>(i, l, 0, concat.toString(), l2));
+                       out.collect(new Tuple5<>(i, l, 0, concat.toString(), 
l2));
                }
        }
 
@@ -1372,14 +1373,14 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
                        int i = 0;
-                       long l = 0l;
+                       long l = 0L;
 
                        for ( Tuple3<Integer, Long, String> t : values ) {
                                i += t.f0;
                                l += t.f1;
                        }
 
-                       out.collect(new Tuple3<Integer, Long, String>(i, l, 
"Hello World"));
+                       out.collect(new Tuple3<>(i, l, "Hello World"));
                }
        }
 
@@ -1420,26 +1421,28 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                public void reduce(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
                        int i = 0;
-                       long l = 0l;
+                       long l = 0L;
 
                        for ( Tuple3<Integer, Long, String> t : values ) {
                                i += t.f0;
                                l = t.f1;
                        }
 
-                       out.collect(new Tuple3<Integer, Long, String>(i, l, 
this.f2Replace));
+                       out.collect(new Tuple3<>(i, l, this.f2Replace));
 
                }
        }
 
-       @RichGroupReduceFunction.Combinable
-       public static class Tuple3GroupReduceWithCombine extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> 
{
+       public static class Tuple3GroupReduceWithCombine
+               implements GroupReduceFunction<Tuple3<Integer, Long, String>, 
Tuple2<Integer, String>>,
+                                       GroupCombineFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>>
+       {
                private static final long serialVersionUID = 1L;
 
                @Override
                public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
-                       Tuple3<Integer, Long, String> o = new Tuple3<Integer, 
Long, String>(0, 0l, "");
+                       Tuple3<Integer, Long, String> o = new Tuple3<>(0, 0L, 
"");
 
                        for ( Tuple3<Integer, Long, String> t : values ) {
                                o.f0 += t.f0;
@@ -1461,13 +1464,14 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                s = t.f2;
                        }
 
-                       out.collect(new Tuple2<Integer, String>(i, s));
+                       out.collect(new Tuple2<>(i, s));
 
                }
        }
 
-       @RichGroupReduceFunction.Combinable
-       public static class Tuple3SortedGroupReduceWithCombine extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> 
{
+       public static class Tuple3SortedGroupReduceWithCombine
+               implements GroupReduceFunction<Tuple3<Integer, Long, String>, 
Tuple2<Integer, String>>,
+                                       GroupCombineFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -1486,7 +1490,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                concat.setLength(concat.length() - 1);
                        }
 
-                       out.collect(new Tuple3<Integer, Long, String>(sum, key, 
concat.toString()));
+                       out.collect(new Tuple3<>(sum, key, concat.toString()));
                }
 
                @Override
@@ -1499,18 +1503,19 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                s = t.f2;
                        }
 
-                       out.collect(new Tuple2<Integer, String>(i, s));
+                       out.collect(new Tuple2<>(i, s));
                }
        }
 
-       @RichGroupReduceFunction.Combinable
-       public static class Tuple3AllGroupReduceWithCombine extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> 
{
+       public static class Tuple3AllGroupReduceWithCombine
+               implements GroupReduceFunction<Tuple3<Integer, Long, String>, 
Tuple2<Integer, String>>,
+                                       GroupCombineFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
 
                @Override
                public void combine(Iterable<Tuple3<Integer, Long, String>> 
values, Collector<Tuple3<Integer, Long, String>> out) {
 
-                       Tuple3<Integer, Long, String> o = new Tuple3<Integer, 
Long, String>(0, 0l, "");
+                       Tuple3<Integer, Long, String> o = new Tuple3<>(0, 0L, 
"");
 
                        for ( Tuple3<Integer, Long, String> t : values ) {
                                o.f0 += t.f0;
@@ -1532,13 +1537,14 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                s += t.f2;
                        }
 
-                       out.collect(new Tuple2<Integer, String>(i, s));
+                       out.collect(new Tuple2<>(i, s));
 
                }
        }
 
-       @RichGroupReduceFunction.Combinable
-       public static class CustomTypeGroupReduceWithCombine extends 
RichGroupReduceFunction<CustomType, CustomType> {
+       public static class CustomTypeGroupReduceWithCombine
+               implements GroupReduceFunction<CustomType, CustomType>,
+                                       GroupCombineFunction<CustomType, 
CustomType> {
                private static final long serialVersionUID = 1L;
 
                @Override
@@ -1571,8 +1577,9 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                }
        }
 
-       @RichGroupReduceFunction.Combinable
-       public static class OrderCheckingCombinableReduce extends 
RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, 
String>> {
+       public static class OrderCheckingCombinableReduce
+               implements GroupReduceFunction<Tuple3<Integer, Long, String>, 
Tuple3<Integer, Long, String>>,
+                                       GroupCombineFunction<Tuple3<Integer, 
Long, String>, Tuple3<Integer, Long, String>> {
                private static final long serialVersionUID = 1L;
 
                @Override

Reply via email to