[FLINK-1045][dataSet] Remove Combinable annotation. This closes #1522
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5914e9ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5914e9ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5914e9ab Branch: refs/heads/master Commit: 5914e9ab3fa8e8be8fff9d7e6c10d612af7e27cf Parents: af029e7 Author: Fabian Hueske <fhue...@apache.org> Authored: Mon Jan 18 15:57:49 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Jan 26 11:17:24 2016 +0100 ---------------------------------------------------------------------- docs/apis/batch/dataset_transformations.md | 111 ++++++++++--------- .../mapred/HadoopReduceCombineFunction.java | 22 ++-- .../common/functions/GroupReduceFunction.java | 5 + .../functions/RichGroupReduceFunction.java | 56 +--------- .../flink/api/java/functions/FirstReducer.java | 2 - .../api/java/operators/AggregateOperator.java | 55 ++++----- .../api/java/operators/DistinctOperator.java | 16 ++- .../api/java/operators/GroupReduceOperator.java | 53 ++++++++- .../PlanUnwrappingReduceGroupOperator.java | 45 +++++--- ...PlanUnwrappingSortedReduceGroupOperator.java | 33 ++++-- .../runtime/ExpressionAggregateFunction.scala | 23 +++- .../java/GroupReduceCompilationTest.java | 40 +++++-- .../testfunctions/IdentityGroupReducer.java | 5 +- .../IdentityGroupReducerCombinable.java | 13 ++- .../testfunctions/Top1GroupReducer.java | 13 ++- .../operators/CombineTaskExternalITCase.java | 13 ++- .../runtime/operators/CombineTaskTest.java | 34 +++--- .../operators/ReduceTaskExternalITCase.java | 8 +- .../flink/runtime/operators/ReduceTaskTest.java | 10 +- .../operators/chaining/ChainTaskTest.java | 15 ++- .../CombiningUnilateralSortMergerITCase.java | 11 +- .../scala/operators/ScalaAggregateOperator.java | 21 ++-- .../test/accumulators/AccumulatorITCase.java | 18 +-- .../javaApiOperators/GroupReduceITCase.java | 81 +++++++------- .../examples/KMeansSingleStepTest.java | 18 +-- .../examples/RelationalQueryCompilerTest.java | 21 ++-- .../api/scala/operators/GroupReduceITCase.scala | 59 +++++----- 27 files changed, 469 insertions(+), 332 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/docs/apis/batch/dataset_transformations.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/dataset_transformations.md b/docs/apis/batch/dataset_transformations.md index 763de0f..31a1dfa 100644 --- a/docs/apis/batch/dataset_transformations.md +++ b/docs/apis/batch/dataset_transformations.md @@ -551,43 +551,49 @@ val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup { #### Combinable GroupReduceFunctions In contrast to a reduce function, a group-reduce function is not -necessarily combinable. In order to make a group-reduce function -combinable, you need to use the `RichGroupReduceFunction` variant, -implement (override) the `combine()` method, and annotate the -`RichGroupReduceFunction` with the `@Combinable` annotation as shown here: +implicitly combinable. In order to make a group-reduce function +combinable it must implement the `GroupCombineFunction` interface. + +**Important**: The generic input and output types of +the `GroupCombineFunction` interface must be equal to the generic input type +of the `GroupReduceFunction` as shown in the following example: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> ~~~java -// Combinable GroupReduceFunction that computes two sums. -// Note that we use the RichGroupReduceFunction because it defines the combine method -@Combinable -public class MyCombinableGroupReducer - extends RichGroupReduceFunction<Tuple3<String, Integer, Double>, - Tuple3<String, Integer, Double>> { +// Combinable GroupReduceFunction that computes a sum. +public class MyCombinableGroupReducer implements + GroupReduceFunction<Tuple2<String, Integer>, String>, + GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> +{ @Override - public void reduce(Iterable<Tuple3<String, Integer, Double>> in, - Collector<Tuple3<String, Integer, Double>> out) { + public void reduce(Iterable<Tuple2<String, Integer>> in, + Collector<String> out) { String key = null; - int intSum = 0; - double doubleSum = 0.0; + int sum = 0; - for (Tuple3<String, Integer, Double> curr : in) { + for (Tuple2<String, Integer> curr : in) { key = curr.f0; - intSum += curr.f1; - doubleSum += curr.f2; + sum += curr.f1; } - // emit a tuple with both sums - out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum)); + // concat key and sum and emit + out.collect(key + "-" + sum); } @Override - public void combine(Iterable<Tuple3<String, Integer, Double>> in, - Collector<Tuple3<String, Integer, Double>> out) { - // in some cases combine() calls can simply be forwarded to reduce(). - this.reduce(in, out); + public void combine(Iterable<Tuple2<String, Integer>> in, + Collector<Tuple2<String, Integer>> out) { + String key = null; + int sum = 0; + + for (Tuple2<String, Integer> curr : in) { + key = curr.f0; + sum += curr.f1; + } + // emit tuple with key and sum + out.collect(new Tuple2<>(key, sum)); } } ~~~ @@ -598,33 +604,28 @@ public class MyCombinableGroupReducer ~~~scala // Combinable GroupReduceFunction that computes two sums. -// Note that we use the RichGroupReduceFunction because it defines the combine method -@Combinable class MyCombinableGroupReducer - extends RichGroupReduceFunction[(String, Int, Double), (String, Int, Double)] {} - - def reduce( - in: java.lang.Iterable[(String, Int, Double)], - out: Collector[(String, Int, Double)]): Unit = { - - val key: String = null - val intSum = 0 - val doubleSum = 0.0 - - for (curr <- in) { - key = curr._1 - intSum += curr._2 - doubleSum += curr._3 - } - // emit a tuple with both sums - out.collect(key, intSum, doubleSum); + extends GroupReduceFunction[(String, Int), String] + with GroupCombineFunction[(String, Int), (String, Int)] +{ + override def reduce( + in: java.lang.Iterable[(String, Int)], + out: Collector[String]): Unit = + { + val r: (String, Int) = + in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) ) + // concat key and sum and emit + out.collect (r._1 + "-" + r._2) } - def combine( - in: java.lang.Iterable[(String, Int, Double)], - out: Collector[(String, Int, Double)]): Unit = { - // in some cases combine() calls can simply be forwarded to reduce(). - this.reduce(in, out) + override def combine( + in: java.lang.Iterable[(String, Int)], + out: Collector[(String, Int)]): Unit = + { + val r: (String, Int) = + in.asScala.reduce( (a,b) => (a._1, a._2 + b._2) ) + // emit tuple with key and sum + out.collect(r) } } ~~~ @@ -635,14 +636,16 @@ class MyCombinableGroupReducer ~~~python class GroupReduce(GroupReduceFunction): def reduce(self, iterator, collector): - key, int_sum, float_sum = iterator.next() + key, int_sum = iterator.next() for value in iterator: int_sum += value[1] - float_sum += value[2] - collector.collect((key, int_sum, float_sum)) - # in some cases combine() calls can simply be forwarded to reduce(). + collector.collect(key + "-" + int_sum)) + def combine(self, iterator, collector): - return self.reduce(iterator, collector) + key, int_sum = iterator.next() + for value in iterator: + int_sum += value[1] + collector.collect((key, int_sum)) data.reduce_group(GroupReduce(), combinable=True) ~~~ @@ -653,7 +656,7 @@ data.reduce_group(GroupReduce(), combinable=True) ### GroupCombine on a Grouped DataSet The GroupCombine transformation is the generalized form of the combine step in -the Combinable GroupReduceFunction. It is generalized in the sense that it +the combinable GroupReduceFunction. It is generalized in the sense that it allows combining of input type `I` to an arbitrary output type `O`. In contrast, the combine step in the GroupReduce only allows combining from input type `I` to output type `I`. This is because the reduce step in the GroupReduceFunction @@ -872,7 +875,7 @@ val output = input.reduceGroup(new MyGroupReducer()) **Note:** A GroupReduce transformation on a full DataSet cannot be done in parallel if the group-reduce function is not combinable. Therefore, this can be a very compute intensive operation. -See the paragraph on "Combineable Group-Reduce Functions" above to learn how to implement a +See the paragraph on "Combinable GroupReduceFunctions" above to learn how to implement a combinable group-reduce function. ### GroupCombine on a full DataSet http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java index 97b9768..e164d89 100644 --- a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -23,6 +23,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -44,10 +45,10 @@ import org.apache.hadoop.mapred.Reporter; * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction. */ @SuppressWarnings("rawtypes") -@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> - extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> - implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> + implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>, + ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { private static final long serialVersionUID = 1L; @@ -104,10 +105,10 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> this.reporter = new HadoopDummyReporter(); Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); - TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass((Class<KEYIN>) inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); - this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); - this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>(); - this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<>(keySerializer); + this.combineCollector = new HadoopOutputCollector<>(); + this.reduceCollector = new HadoopOutputCollector<>(); } @Override @@ -131,9 +132,9 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); - final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); - final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); - return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass(outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass(outValClass); + return new TupleTypeInfo<>(keyTypeInfo, valueTypleInfo); } /** @@ -161,4 +162,5 @@ public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> jobConf = new JobConf(); jobConf.readFields(in); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java index 37490c5..42b8e0c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/GroupReduceFunction.java @@ -38,6 +38,11 @@ import org.apache.flink.util.Collector; * * DataSet<X> result = input.groupBy(<key-definition>).reduceGroup(new MyGroupReduceFunction()); * }</pre> + * + * Partial computation can significantly improve the performance of a {@link GroupReduceFunction}. + * This technique is also known as applying a Combiner. + * Implement the {@link GroupCombineFunction<T, T>} interface to enable partial computations, i.e., + * a combiner for this {@link GroupReduceFunction}. * * @param <T> Type of the elements that this function processes. * @param <O> The type of the elements returned by the user-defined function. http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java index 94d60a8..2c74902 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichGroupReduceFunction.java @@ -18,11 +18,6 @@ package org.apache.flink.api.common.functions; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - import org.apache.flink.annotation.Public; import org.apache.flink.util.Collector; @@ -31,60 +26,21 @@ import org.apache.flink.util.Collector; * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and * {@link RichFunction#close()}. + * + * Partial computation can significantly improve the performance of a {@link RichGroupReduceFunction}. + * This technique is also known as applying a Combiner. + * Implement the {@link GroupCombineFunction<IN, IN>} interface to enable partial computation, i.e., + * a combiner for this {@link RichGroupReduceFunction}. * * @param <IN> Type of the elements that this function processes. * @param <OUT> The type of the elements returned by the user-defined function. */ @Public -public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT>, GroupCombineFunction<IN, IN> { +public abstract class RichGroupReduceFunction<IN, OUT> extends AbstractRichFunction implements GroupReduceFunction<IN, OUT> { private static final long serialVersionUID = 1L; @Override public abstract void reduce(Iterable<IN> values, Collector<OUT> out) throws Exception; - /** - * The combine methods pre-reduces elements. It may be called on subsets of the data - * before the actual reduce function. This is often helpful to lower data volume prior - * to reorganizing the data in an expensive way, as might be required for the final - * reduce function. - * <p> - * This method is only ever invoked when the subclass of {@link RichGroupReduceFunction} - * adds the {@link Combinable} annotation, or if the <i>combinable</i> flag is set when defining - * the <i>reduceGroup</i> operation via - * org.apache.flink.api.java.operators.GroupReduceOperator#setCombinable(boolean). - * <p> - * Since the reduce function will be called on the result of this method, it is important that this - * method returns the same data type as it consumes. By default, this method only calls the - * {@link #reduce(Iterable, Collector)} method. If the behavior in the pre-reducing is different - * from the final reduce function (for example because the reduce function changes the data type), - * this method must be overwritten, or the execution will fail. - * - * @param values The iterator returning the group of values to be reduced. - * @param out The collector to emit the returned values. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public void combine(Iterable<IN> values, Collector<IN> out) throws Exception { - @SuppressWarnings("unchecked") - Collector<OUT> c = (Collector<OUT>) out; - reduce(values, c); - } - - // -------------------------------------------------------------------------------------------- - - /** - * This annotation can be added to classes that extend {@link RichGroupReduceFunction}, in oder to mark - * them as "combinable". The system may call the {@link RichGroupReduceFunction#combine(Iterable, Collector)} - * method on such functions, to pre-reduce the data before transferring it over the network to - * the actual group reduce operation. - * <p> - * Marking combinable functions as such is in general beneficial for performance. - */ - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.TYPE) - @Public - public static @interface Combinable {} } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java index a604cc0..2eda077 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java @@ -19,10 +19,8 @@ package org.apache.flink.api.java.functions; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.util.Collector; -@Combinable public class FirstReducer<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java index d65ba0d..17dff69 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java @@ -19,13 +19,12 @@ package org.apache.flink.api.java.operators; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -49,9 +48,9 @@ import com.google.common.base.Preconditions; */ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, AggregateOperator<IN>> { - private final List<AggregationFunction<?>> aggregationFunctions = new ArrayList<AggregationFunction<?>>(4); + private final List<AggregationFunction<?>> aggregationFunctions = new ArrayList<>(4); - private final List<Integer> fields = new ArrayList<Integer>(4); + private final List<Integer> fields = new ArrayList<>(4); private final Grouping<IN> grouping; @@ -186,7 +185,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate // distinguish between grouped reduce and non-grouped reduce if (this.grouping == null) { // non grouped aggregation - UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType()); + UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType()); GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po = new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name); @@ -203,7 +202,7 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate if (this.grouping.getKeys() instanceof Keys.ExpressionKeys) { // grouped aggregation int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions(); - UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType()); + UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType()); GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po = new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name); @@ -214,19 +213,17 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate po.setCustomPartitioner(grouping.getCustomPartitioner()); SingleInputSemanticProperties props = new SingleInputSemanticProperties(); - - for (int i = 0; i < logicalKeyPositions.length; i++) { - int keyField = logicalKeyPositions[i]; + + for (int keyField : logicalKeyPositions) { boolean keyFieldUsedInAgg = false; - - for (int k = 0; k < fields.length; k++) { - int aggField = fields[k]; + + for (int aggField : fields) { if (keyField == aggField) { keyFieldUsedInAgg = true; break; } } - + if (!keyFieldUsedInAgg) { props.addForwardedField(keyField, keyField); } @@ -247,8 +244,10 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate // -------------------------------------------------------------------------------------------- - @Combinable - public static final class AggregatingUdf<T extends Tuple> extends RichGroupReduceFunction<T, T> { + public static final class AggregatingUdf<T extends Tuple> + extends RichGroupReduceFunction<T, T> + implements GroupCombineFunction<T, T> { + private static final long serialVersionUID = 1L; private final int[] fieldPositions; @@ -268,8 +267,8 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate @Override public void open(Configuration parameters) throws Exception { - for (int i = 0; i < aggFunctions.length; i++) { - aggFunctions[i].initializeAggregate(); + for (AggregationFunction<Object> aggFunction : aggFunctions) { + aggFunction.initializeAggregate(); } } @@ -280,24 +279,28 @@ public class AggregateOperator<IN> extends SingleInputOperator<IN, IN, Aggregate // aggregators are initialized from before - T current = null; - final Iterator<T> values = records.iterator(); - while (values.hasNext()) { - current = values.next(); - + T outT = null; + for (T record : records) { + outT = record; + for (int i = 0; i < fieldPositions.length; i++) { - Object val = current.getFieldNotNull(fieldPositions[i]); - aggFunctions[i].aggregate(val); + Object val = record.getFieldNotNull(fieldPositions[i]); + aggFunctions[i].aggregate(val); } } for (int i = 0; i < fieldPositions.length; i++) { Object aggVal = aggFunctions[i].getAggregate(); - current.setField(aggVal, fieldPositions[i]); + outT.setField(aggVal, fieldPositions[i]); aggFunctions[i].initializeAggregate(); } - out.collect(current); + out.collect(outT); + } + + @Override + public void combine(Iterable<T> records, Collector<T> out) { + reduce(records, out); } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java index d85c9a6..5102c80 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java @@ -18,13 +18,13 @@ package org.apache.flink.api.java.operators; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; import org.apache.flink.api.java.tuple.Tuple2; @@ -59,7 +59,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera @Override protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) { - final RichGroupReduceFunction<T, T> function = new DistinctFunction<>(); + final GroupReduceFunction<T, T> function = new DistinctFunction<>(); String name = getName() != null ? getName() : "Distinct at " + distinctLocationName; @@ -68,7 +68,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera int[] logicalKeyPositions = keys.computeLogicalKeyPositions(); UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType()); GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po = - new GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name); + new GroupReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name); po.setCombinable(true); po.setInput(input); @@ -108,7 +108,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct( SelectorFunctionKeys<IN, ?> rawKeys, - RichGroupReduceFunction<IN, OUT> function, + GroupReduceFunction<IN, OUT> function, TypeInformation<OUT> outputType, String name, Operator<IN> input) @@ -126,8 +126,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera return reducer; } - @RichGroupReduceFunction.Combinable - public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> { + public static final class DistinctFunction<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> { private static final long serialVersionUID = 1L; @@ -135,5 +134,10 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera public void reduce(Iterable<T> values, Collector<T> out) { out.collect(values.iterator().next()); } + + @Override + public void combine(Iterable<T> values, Collector<T> out) { + out.collect(values.iterator().next()); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java index 5225b33..91f2efd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator; @@ -35,6 +34,11 @@ import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduc import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.DataSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; /** * This operator represents the application of a "reduceGroup" function on a data set, and the @@ -45,6 +49,8 @@ import org.apache.flink.api.java.DataSet; */ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, GroupReduceOperator<IN, OUT>> { + private static final Logger LOG = LoggerFactory.getLogger(GroupReduceOperator.class); + private final GroupReduceFunction<IN, OUT> function; private final Grouping<IN> grouper; @@ -88,9 +94,48 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT } private void checkCombinability() { - if (function instanceof GroupCombineFunction && - function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) { - this.combinable = true; + if (function instanceof GroupCombineFunction) { + + // check if the generic types of GroupCombineFunction and GroupReduceFunction match, i.e., + // GroupCombineFunction<IN, IN> and GroupReduceFunction<IN, OUT>. + // This is a best effort check. If the check cannot be done, we might fail at runtime. + Type[] reduceTypes = null; + Type[] combineTypes = null; + + Type[] genInterfaces = function.getClass().getGenericInterfaces(); + for (Type genInterface : genInterfaces) { + if (genInterface instanceof ParameterizedType) { + // get parameters of GroupReduceFunction + if (((ParameterizedType) genInterface).getRawType().equals(GroupReduceFunction.class)) { + reduceTypes = ((ParameterizedType) genInterface).getActualTypeArguments(); + // get parameters of GroupCombineFunction + } else if (((ParameterizedType) genInterface).getRawType().equals(GroupCombineFunction.class)) { + combineTypes = ((ParameterizedType) genInterface).getActualTypeArguments(); + } + } + } + + if (reduceTypes != null && reduceTypes.length == 2 && + combineTypes != null && combineTypes.length == 2) { + + if (reduceTypes[0].equals(combineTypes[0]) && reduceTypes[0].equals(combineTypes[1])) { + this.combinable = true; + } else { + LOG.warn("GroupCombineFunction cannot be used as combiner for GroupReduceFunction. " + + "Generic types are incompatible."); + this.combinable = false; + } + } + else if (reduceTypes == null || reduceTypes.length != 2) { + LOG.warn("Cannot check generic types of GroupReduceFunction. " + + "Enabling combiner but combine function might fail at runtime."); + this.combinable = true; + } + else { + LOG.warn("Cannot check generic types of GroupCombineFunction. " + + "Enabling combiner but combine function might fail at runtime."); + this.combinable = true; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java index e01af50..72b79a4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; @@ -34,31 +33,43 @@ import org.apache.flink.util.Collector; */ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> { - public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name, - TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable) + public PlanUnwrappingReduceGroupOperator( + GroupReduceFunction<IN, OUT> udf, + Keys.SelectorFunctionKeys<IN, K> key, + String name, + TypeInformation<OUT> outType, + TypeInformation<Tuple2<K, IN>> typeInfoWithKey, + boolean combinable) { - super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), - new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); + super( + combinable ? + new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>(udf) : + new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf), + new UnaryOperatorInformation<>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name); super.setCombinable(combinable); } // -------------------------------------------------------------------------------------------- - @RichGroupReduceFunction.Combinable - public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>> + public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>> implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> { private static final long serialVersionUID = 1L; private TupleUnwrappingIterator<IN, K> iter; - private TupleWrappingCollector<IN, K> coll; - - private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) { + private TupleWrappingCollector<IN, K> coll; + + private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) { super(wrapped); - this.iter = new TupleUnwrappingIterator<IN, K>(); - this.coll = new TupleWrappingCollector<IN, K>(this.iter); + + if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) { + throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface."); + } + + this.iter = new TupleUnwrappingIterator<>(); + this.coll = new TupleWrappingCollector<>(this.iter); } @@ -68,11 +79,13 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp this.wrappedFunction.reduce(iter, out); } + @SuppressWarnings("unchecked") @Override public void combine(Iterable<Tuple2<K, IN>> values, Collector<Tuple2<K, IN>> out) throws Exception { - iter.set(values.iterator()); - coll.set(out); - this.wrappedFunction.combine(iter, coll); + + iter.set(values.iterator()); + coll.set(out); + ((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll); } @Override @@ -91,7 +104,7 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) { super(wrapped); - this.iter = new TupleUnwrappingIterator<IN, K>(); + this.iter = new TupleUnwrappingIterator<>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java index e4d41f4..278d706 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.UnaryOperatorInformation; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.operators.Keys; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; @@ -34,18 +33,26 @@ import org.apache.flink.util.Collector; */ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>,OUT>> { - public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name, - TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable) + public PlanUnwrappingSortedReduceGroupOperator( + GroupReduceFunction<IN, OUT> udf, + Keys.SelectorFunctionKeys<IN, K1> groupingKey, + Keys.SelectorFunctionKeys<IN, K2> sortingKey, + String name, + TypeInformation<OUT> outType, + TypeInformation<Tuple3<K1, K2, IN>> + typeInfoWithKey, boolean combinable) { - super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf), - new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name); + super( + combinable ? + new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : + new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf), + new UnaryOperatorInformation<>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name); super.setCombinable(combinable); } // -------------------------------------------------------------------------------------------- - @RichGroupReduceFunction.Combinable public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>> implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> { @@ -57,8 +64,13 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) { super(wrapped); - this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>(); - this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter); + + if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) { + throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface."); + } + + this.iter = new Tuple3UnwrappingIterator<>(); + this.coll = new Tuple3WrappingCollector<>(this.iter); } @@ -68,11 +80,12 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr this.wrappedFunction.reduce(iter, out); } + @SuppressWarnings("unchecked") @Override public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception { iter.set(values.iterator()); coll.set(out); - ((GroupCombineFunction)this.wrappedFunction).combine(iter, coll); + ((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll); } @Override @@ -91,7 +104,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) { super(wrapped); - this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>(); + this.iter = new Tuple3UnwrappingIterator<>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala index 932f9df..38afc21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala @@ -18,17 +18,17 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.table.Row -import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable +import org.apache.flink.api.common.functions.{GroupReduceFunction, GroupCombineFunction, RichGroupReduceFunction} import org.apache.flink.api.java.aggregation.AggregationFunction import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector -@Combinable class ExpressionAggregateFunction( private val fieldPositions: Seq[Int], private val functions: Seq[AggregationFunction[Any]]) - extends RichGroupReduceFunction[Row, Row] { + extends RichGroupReduceFunction[Row, Row] + with GroupCombineFunction[Row, Row] +{ override def open(conf: Configuration): Unit = { var i = 0 @@ -69,10 +69,17 @@ class ExpressionAggregateFunction( out.collect(current) } + override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { + reduce(in, out) + } + } -@Combinable -class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, Row] { + +class NoExpressionAggregateFunction() + extends GroupReduceFunction[Row, Row] + with GroupCombineFunction[Row, Row] +{ override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { @@ -86,4 +93,8 @@ class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, Row] out.collect(first) } + override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { + reduce(in, out) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java index 6b49dd4..26a2faf 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java @@ -19,6 +19,8 @@ package org.apache.flink.optimizer.java; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.KeySelector; @@ -92,11 +94,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java env.setParallelism(8); DataSet<Long> data = env.generateSequence(1, 8000000).name("source"); - - GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() { - public void reduce(Iterable<Long> values, Collector<Long> out) {} - }).name("reducer"); - + + GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new CombineReducer2()).name("reducer"); + reduced.setCombinable(true); reduced.output(new DiscardingOutputFormat<Long>()).name("sink"); @@ -195,9 +195,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data .groupBy(1) - .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { - public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} - }).name("reducer"); + .reduceGroup(new CombineReducer()).name("reducer"); reduced.setCombinable(true); reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink"); @@ -313,9 +311,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java .groupBy(new KeySelector<Tuple2<String,Double>, String>() { public String getKey(Tuple2<String, Double> value) { return value.f0; } }) - .reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() { - public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} - }).name("reducer"); + .reduceGroup(new CombineReducer()).name("reducer"); reduced.setCombinable(true); reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink"); @@ -366,4 +362,26 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java fail(e.getClass().getSimpleName() + " in test: " + e.getMessage()); } } + + public static class CombineReducer implements + GroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>, + GroupCombineFunction<Tuple2<String, Double>, Tuple2<String, Double>> { + + @Override + public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + + @Override + public void combine(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {} + } + + public static class CombineReducer2 implements + GroupReduceFunction<Long, Long>, + GroupCombineFunction<Long, Long> { + + @Override + public void reduce(Iterable<Long> values, Collector<Long> out) {} + + @Override + public void combine(Iterable<Long> values, Collector<Long> out) {} + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java index da4ef17..0336771 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducer.java @@ -19,11 +19,10 @@ package org.apache.flink.optimizer.testfunctions; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.util.Collector; -public class IdentityGroupReducer<T> extends RichGroupReduceFunction<T, T> { +public class IdentityGroupReducer<T> implements GroupReduceFunction<T, T> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java index ce24bb6..e1dbdd5 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityGroupReducerCombinable.java @@ -19,12 +19,12 @@ package org.apache.flink.optimizer.testfunctions; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.util.Collector; -@Combinable -public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T, T> { +public class IdentityGroupReducerCombinable<T> + implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> { private static final long serialVersionUID = 1L; @@ -34,4 +34,9 @@ public class IdentityGroupReducerCombinable<T> extends RichGroupReduceFunction<T out.collect(next); } } + + @Override + public void combine(Iterable<T> values, Collector<T> out) { + reduce(values, out); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java index 48d13ca..7213a25 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/Top1GroupReducer.java @@ -18,13 +18,13 @@ package org.apache.flink.optimizer.testfunctions; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.util.Collector; -@Combinable -public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> { +public class Top1GroupReducer<T> + implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> { private static final long serialVersionUID = 1L; @@ -32,4 +32,9 @@ public class Top1GroupReducer<T> extends RichGroupReduceFunction<T, T> { public void reduce(Iterable<T> values, Collector<T> out) { out.collect(values.iterator().next()); } + + @Override + public void combine(Iterable<T> values, Collector<T> out) { + out.collect(values.iterator().next()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java index e162d7d..1699f79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java @@ -22,11 +22,12 @@ import java.util.ArrayList; import java.util.HashMap; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.util.Collector; import org.junit.Assert; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.runtime.testutils.recordutils.RecordComparator; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; @@ -166,8 +167,9 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun // ------------------------------------------------------------------------ // ------------------------------------------------------------------------ - @Combinable - public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + public static class MockCombiningReduceStub implements + GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> + { private static final long serialVersionUID = 1L; private final IntValue theInteger = new IntValue(); @@ -194,8 +196,9 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun } } - @Combinable - public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + public static final class MockFailingCombiningReduceStub implements + GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> + { private static final long serialVersionUID = 1L; private int cnt = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java index b6ce2d4..f0d9a8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -51,15 +53,15 @@ public class CombineTaskTest private final double combine_frac; - private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>(); + private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<>(); @SuppressWarnings("unchecked") - private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>( + private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<>( (Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class, new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE }); - private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>( + private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<>( new int[]{0}, new TypeComparator<?>[] { new IntComparator(true) }, new TypeSerializer<?>[] { IntSerializer.INSTANCE }); @@ -88,7 +90,7 @@ public class CombineTaskTest getTaskConfig().setFilehandlesDriver(2); final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = - new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(); + new GroupReduceCombineDriver<>(); testDriver(testTask, MockCombiningReduceStub.class); @@ -127,7 +129,7 @@ public class CombineTaskTest getTaskConfig().setFilehandlesDriver(2); final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = - new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(); + new GroupReduceCombineDriver<>(); try { testDriver(testTask, MockFailingCombiningReduceStub.class); @@ -147,7 +149,7 @@ public class CombineTaskTest public void testCancelCombineTaskSorting() { try { MutableObjectIterator<Tuple2<Integer, Integer>> slowInfiniteInput = - new DelayingIterator<Tuple2<Integer, Integer>>(new InfiniteIntTupleIterator(), 1); + new DelayingIterator<>(new InfiniteIntTupleIterator(), 1); setInput(slowInfiniteInput, serializer); addDriverComparator(this.comparator); @@ -159,7 +161,7 @@ public class CombineTaskTest getTaskConfig().setFilehandlesDriver(2); final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask = - new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(); + new GroupReduceCombineDriver<>(); Thread taskRunner = new Thread() { @Override @@ -200,9 +202,9 @@ public class CombineTaskTest // Test Combiners // ------------------------------------------------------------------------ - @RichGroupReduceFunction.Combinable - public static class MockCombiningReduceStub extends - RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + public static class MockCombiningReduceStub implements + GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, + GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; @@ -216,7 +218,7 @@ public class CombineTaskTest sum += next.f1; } - out.collect(new Tuple2<Integer, Integer>(key, sum)); + out.collect(new Tuple2<>(key, sum)); } @Override @@ -225,9 +227,9 @@ public class CombineTaskTest } } - @RichGroupReduceFunction.Combinable - public static final class MockFailingCombiningReduceStub extends - RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + public static final class MockFailingCombiningReduceStub implements + GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, + GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; @@ -244,7 +246,7 @@ public class CombineTaskTest } int resultValue = sum - key; - out.collect(new Tuple2<Integer, Integer>(key, resultValue)); + out.collect(new Tuple2<>(key, resultValue)); } @Override @@ -262,7 +264,7 @@ public class CombineTaskTest } int resultValue = sum - key; - out.collect(new Tuple2<Integer, Integer>(key, resultValue)); + out.collect(new Tuple2<>(key, resultValue)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java index a00aea3..6ebafee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java @@ -22,11 +22,12 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.runtime.testutils.recordutils.RecordComparator; import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; @@ -232,8 +233,9 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc } } - @Combinable - public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + public static class MockCombiningReduceStub implements + GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> { + private static final long serialVersionUID = 1L; private final IntValue key = new IntValue(); http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java index 531d8ba..904b81e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java @@ -23,11 +23,12 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.runtime.testutils.recordutils.RecordComparator; import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; @@ -289,9 +290,10 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor out.collect(element); } } - - @Combinable - public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> { + + public static class MockCombiningReduceStub + implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> { + private static final long serialVersionUID = 1L; private final IntValue key = new IntValue(); http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java index 4d8e0de..c3c23de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory; import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory; @@ -32,7 +34,7 @@ import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.FlatMapDriver; import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub; -import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub; +import org.apache.flink.runtime.operators.ReduceTaskTest.MockCombiningReduceStub; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.testutils.TaskTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; @@ -94,7 +96,7 @@ public class ChainTaskTest extends TaskTestBase { combineConfig.setRelativeMemoryDriver(memoryFraction); // udf - combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockReduceStub.class)); + combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockCombiningReduceStub.class)); getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine"); } @@ -184,7 +186,9 @@ public class ChainTaskTest extends TaskTestBase { } } - public static final class MockFailingCombineStub extends RichGroupReduceFunction<Record, Record> { + public static final class MockFailingCombineStub implements + GroupReduceFunction<Record, Record>, + GroupCombineFunction<Record, Record> { private static final long serialVersionUID = 1L; private int cnt = 0; @@ -199,5 +203,10 @@ public class ChainTaskTest extends TaskTestBase { out.collect(r); } } + + @Override + public void combine(Iterable<Record> values, Collector<Record> out) throws Exception { + reduce(values, out); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java index 7b7b940..38e3db8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java @@ -23,6 +23,7 @@ import java.util.Hashtable; import java.util.Iterator; import java.util.NoSuchElementException; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -252,7 +253,10 @@ public class CombiningUnilateralSortMergerITCase { // -------------------------------------------------------------------------------------------- - public static class TestCountCombiner extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> { + public static class TestCountCombiner + extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + implements GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> + { private static final long serialVersionUID = 1L; private Integer count = 0; @@ -290,7 +294,10 @@ public class CombiningUnilateralSortMergerITCase { } } - public static class TestCountCombiner2 extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> { + public static class TestCountCombiner2 + extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> + implements GroupCombineFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> + { private static final long serialVersionUID = 1L; public volatile boolean opened = false; http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java index ab32ca6..42f0e70 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.UnaryOperatorInformation; @@ -52,9 +52,9 @@ import scala.Product; */ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, ScalaAggregateOperator<IN>> { - private final List<AggregationFunction<?>> aggregationFunctions = new ArrayList<AggregationFunction<?>>(4); + private final List<AggregationFunction<?>> aggregationFunctions = new ArrayList<>(4); - private final List<Integer> fields = new ArrayList<Integer>(4); + private final List<Integer> fields = new ArrayList<>(4); private final Grouping<IN> grouping; @@ -170,7 +170,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal // distinguish between grouped reduce and non-grouped reduce if (this.grouping == null) { // non grouped aggregation - UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType()); + UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType()); GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po = new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, new int[0], name); @@ -187,7 +187,7 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal if (this.grouping.getKeys() instanceof Keys.ExpressionKeys) { // grouped aggregation int[] logicalKeyPositions = this.grouping.getKeys().computeLogicalKeyPositions(); - UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<IN, IN>(getInputType(), getResultType()); + UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType()); GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>> po = new GroupReduceOperatorBase<IN, IN, GroupReduceFunction<IN, IN>>(function, operatorInfo, logicalKeyPositions, name); @@ -230,8 +230,10 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal // -------------------------------------------------------------------------------------------- - @Combinable - public static final class AggregatingUdf<T extends Product> extends RichGroupReduceFunction<T, T> { + public static final class AggregatingUdf<T extends Product> + extends RichGroupReduceFunction<T, T> + implements GroupCombineFunction<T, T> + { private static final long serialVersionUID = 1L; private final int[] fieldPositions; @@ -294,5 +296,10 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal out.collect(result); } + @Override + public void combine(Iterable<T> records, Collector<T> out) { + reduce(records, out); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java index d7d45fe..b4015e5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.DoubleCounter; import org.apache.flink.api.common.accumulators.Histogram; import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.DataSet; @@ -74,7 +75,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { System.out.println("Accumulator results:"); JobExecutionResult res = this.result; System.out.println(AccumulatorHelper.getResultsFormated(res.getAllAccumulatorResults())); - + Assert.assertEquals(Integer.valueOf(3), (Integer) res.getAccumulatorResult("num-lines")); Assert.assertEquals(Double.valueOf(getParallelism()), (Double)res.getAccumulatorResult("open-close-counter")); @@ -128,7 +129,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter); // Add custom counter - this.distinctWords = new SetAccumulator<StringValue>(); + this.distinctWords = new SetAccumulator<>(); this.getRuntimeContext().addAccumulator("distinct-words", distinctWords); // Create counter and test increment @@ -164,7 +165,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { for (String token : value.toLowerCase().split("\\W+")) { distinctWords.add(new StringValue(token)); - out.collect(new Tuple2<String, Integer>(token, 1)); + out.collect(new Tuple2<>(token, 1)); ++ wordsPerLine; } wordsPerLineDistribution.add(wordsPerLine); @@ -179,7 +180,10 @@ public class AccumulatorITCase extends JavaProgramTestBase { } - public static class CountWords extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { + public static class CountWords + extends RichGroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> + implements GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> + { private IntCounter reduceCalls; private IntCounter combineCalls; @@ -210,7 +214,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { key = e.f0; sum += e.f1; } - out.collect(new Tuple2<String, Integer>(key, sum)); + out.collect(new Tuple2<>(key, sum)); } } @@ -221,7 +225,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { private static final long serialVersionUID = 1L; - private HashSet<T> set = new HashSet<T>(); + private HashSet<T> set = new HashSet<>(); @Override public void add(T value) { @@ -246,7 +250,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { @Override public Accumulator<T, HashSet<T>> clone() { - SetAccumulator<T> result = new SetAccumulator<T>(); + SetAccumulator<T> result = new SetAccumulator<>(); result.set.addAll(set); return result; } http://git-wip-us.apache.org/repos/asf/flink/blob/5914e9ab/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 95a8cb0..075e60c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.test.javaApiOperators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -465,7 +466,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { @Override public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) { - return new Tuple2<Integer, Long>(t.f0, t.f4); + return new Tuple2<>(t.f0, t.f4); } } @@ -525,7 +526,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { c++; // haha n = v.nest_Lvl1.nest_Lvl2.nest_Lvl3.nest_Lvl4.f1nal; } - out.collect(new Tuple2<String, Integer>(n,c)); + out.collect(new Tuple2<>(n,c)); } } @@ -770,7 +771,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { @Override public Tuple2<Integer, Integer> getKey(CustomType value) throws Exception { - return new Tuple2<Integer, Integer>(value.myInt, value.myInt); + return new Tuple2<>(value.myInt, value.myInt); } } @@ -894,7 +895,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { @Override public Tuple2<Long, Integer> getKey(Tuple5<Integer, Long, Integer, String, Long> in) { - return new Tuple2<Long, Integer>(in.f4, in.f2); + return new Tuple2<>(in.f4, in.f2); } } @@ -1086,7 +1087,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { k = v.f0; s += v.f1; } - out.collect(new Tuple2<Integer, Long>(k, s)); + out.collect(new Tuple2<>(k, s)); } } ); @@ -1131,7 +1132,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { @Test public void testJodatimeDateTimeWithKryo() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new Tuple2<Integer, DateTime>(1, DateTime.now())); + DataSet<Tuple2<Integer, DateTime>> ds = env.fromElements(new Tuple2<>(1, DateTime.now())); DataSet<Tuple2<Integer, DateTime>> reduceDs = ds.groupBy("f1").sum(0).project(0); List<Tuple2<Integer, DateTime>> result = reduceDs.collect(); @@ -1150,9 +1151,9 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { public void testDateNullException() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<Integer, Date>(0, new Date(1230000000)), + DataSet<Tuple2<Integer, Date>> in = env.fromElements(new Tuple2<>(0, new Date(1230000000)), new Tuple2<Integer, Date>(1, null), - new Tuple2<Integer, Date>(2, new Date(1230000000)) + new Tuple2<>(2, new Date(1230000000)) ); DataSet<String> r = in.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, Date>, String>() { @@ -1207,14 +1208,14 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { @Override public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) { int i = 0; - long l = 0l; + long l = 0L; for (Tuple3<Integer, Long, String> t : values) { i += t.f0; l = t.f1; } - out.collect(new Tuple2<Integer, Long>(i, l)); + out.collect(new Tuple2<>(i, l)); } } @@ -1239,7 +1240,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { concat.setLength(concat.length() - 1); } - out.collect(new Tuple3<Integer, Long, String>(sum, key, concat.toString())); + out.collect(new Tuple3<>(sum, key, concat.toString())); } } @@ -1252,8 +1253,8 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { Collector<Tuple5<Integer, Long, Integer, String, Long>> out) { int i = 0; - long l = 0l; - long l2 = 0l; + long l = 0L; + long l2 = 0L; for ( Tuple5<Integer, Long, Integer, String, Long> t : values ) { i = t.f0; @@ -1261,7 +1262,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { l2 = t.f4; } - out.collect(new Tuple5<Integer, Long, Integer, String, Long>(i, l, 0, "P-)", l2)); + out.collect(new Tuple5<>(i, l, 0, "P-)", l2)); } } @@ -1274,8 +1275,8 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { Collector<Tuple5<Integer, Long, Integer, String, Long>> out) { int i = 0; - long l = 0l; - long l2 = 0l; + long l = 0L; + long l2 = 0L; StringBuilder concat = new StringBuilder(); for ( Tuple5<Integer, Long, Integer, String, Long> t : values ) { @@ -1288,7 +1289,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { concat.setLength(concat.length() - 1); } - out.collect(new Tuple5<Integer, Long, Integer, String, Long>(i, l, 0, concat.toString(), l2)); + out.collect(new Tuple5<>(i, l, 0, concat.toString(), l2)); } } @@ -1372,14 +1373,14 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { int i = 0; - long l = 0l; + long l = 0L; for ( Tuple3<Integer, Long, String> t : values ) { i += t.f0; l += t.f1; } - out.collect(new Tuple3<Integer, Long, String>(i, l, "Hello World")); + out.collect(new Tuple3<>(i, l, "Hello World")); } } @@ -1420,26 +1421,28 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { public void reduce(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { int i = 0; - long l = 0l; + long l = 0L; for ( Tuple3<Integer, Long, String> t : values ) { i += t.f0; l = t.f1; } - out.collect(new Tuple3<Integer, Long, String>(i, l, this.f2Replace)); + out.collect(new Tuple3<>(i, l, this.f2Replace)); } } - @RichGroupReduceFunction.Combinable - public static class Tuple3GroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { + public static class Tuple3GroupReduceWithCombine + implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>>, + GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> + { private static final long serialVersionUID = 1L; @Override public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { - Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, ""); + Tuple3<Integer, Long, String> o = new Tuple3<>(0, 0L, ""); for ( Tuple3<Integer, Long, String> t : values ) { o.f0 += t.f0; @@ -1461,13 +1464,14 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { s = t.f2; } - out.collect(new Tuple2<Integer, String>(i, s)); + out.collect(new Tuple2<>(i, s)); } } - @RichGroupReduceFunction.Combinable - public static class Tuple3SortedGroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { + public static class Tuple3SortedGroupReduceWithCombine + implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>>, + GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @Override @@ -1486,7 +1490,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { concat.setLength(concat.length() - 1); } - out.collect(new Tuple3<Integer, Long, String>(sum, key, concat.toString())); + out.collect(new Tuple3<>(sum, key, concat.toString())); } @Override @@ -1499,18 +1503,19 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { s = t.f2; } - out.collect(new Tuple2<Integer, String>(i, s)); + out.collect(new Tuple2<>(i, s)); } } - @RichGroupReduceFunction.Combinable - public static class Tuple3AllGroupReduceWithCombine extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> { + public static class Tuple3AllGroupReduceWithCombine + implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>>, + GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @Override public void combine(Iterable<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) { - Tuple3<Integer, Long, String> o = new Tuple3<Integer, Long, String>(0, 0l, ""); + Tuple3<Integer, Long, String> o = new Tuple3<>(0, 0L, ""); for ( Tuple3<Integer, Long, String> t : values ) { o.f0 += t.f0; @@ -1532,13 +1537,14 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { s += t.f2; } - out.collect(new Tuple2<Integer, String>(i, s)); + out.collect(new Tuple2<>(i, s)); } } - @RichGroupReduceFunction.Combinable - public static class CustomTypeGroupReduceWithCombine extends RichGroupReduceFunction<CustomType, CustomType> { + public static class CustomTypeGroupReduceWithCombine + implements GroupReduceFunction<CustomType, CustomType>, + GroupCombineFunction<CustomType, CustomType> { private static final long serialVersionUID = 1L; @Override @@ -1571,8 +1577,9 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { } } - @RichGroupReduceFunction.Combinable - public static class OrderCheckingCombinableReduce extends RichGroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { + public static class OrderCheckingCombinableReduce + implements GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, + GroupCombineFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> { private static final long serialVersionUID = 1L; @Override