[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301046#comment-15301046 ]
ASF GitHub Bot commented on FLINK-3477: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r64667048 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet<T> set, Keys<T> keys) { * @see DataSet */ public ReduceOperator<T> reduce(ReduceFunction<T> reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** + * Applies a Reduce transformation on a grouped {@link DataSet}.<br> + * For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} + * until only a single element for each group remains. + * A ReduceFunction combines two elements into one new element of the same type. + * + * @param reducer The ReduceFunction that is applied on each group of the DataSet. + * @param strategy The strategy that should be used to execute the combine phase of the reduce. + * If {@code null} is given, then the optimizer will pick the strategy. + * @return A ReduceOperator that represents the reduced DataSet. + * + * @see org.apache.flink.api.common.functions.RichReduceFunction + * @see ReduceOperator + * @see DataSet + */ + public ReduceOperator<T> reduce(ReduceFunction<T> reducer, CombineHint strategy) { --- End diff -- The `ReduceOperator.setCombineHint(CombineHint)` method should be annotated with `@PublicEvolving`. > Add hash-based combine strategy for ReduceFunction > -------------------------------------------------- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime > Reporter: Fabian Hueske > Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)