Stephan Ewen created FLINK-1272: ----------------------------------- Summary: Add a "reduceWithKey" function Key: FLINK-1272 URL: https://issues.apache.org/jira/browse/FLINK-1272 Project: Flink Issue Type: New Feature Components: Java API, Scala API Reporter: Stephan Ewen
Flink does not assume a key/value model for grouping/aggregating/joining. The keys are specified as positions or paths of the objects to be grouped/joined. Currently, we do not expose the key in the {{ReduceFunction}} and {{GroupReduceFunction}}, bit give (iterators over) the objects themselves. Since it is a common case to access the key, I suggest to add a convenience function {{GroupReduceWithKey}} that has the following signature and can be called as follows: {code} public interface GroupReduceWithKeyFunction<KEY, IN, OUT> { void reduceGroup(KEY key, Iterable<IN> value, Collector<OUT> out); } {code} Scala: {code} val data : DataSet[SomePOJO] = ... data .groupBy("id") .reduceGroup( (key, value, out : Collector[(String, Long)]) => out.collect( (key, values.minBy(_.timestamp) ) ); {code} Java: {code} DataSet<SomePOJO> data = ... data .groupBy("id") .reduceGroup( new GroupReduceWithKeyFunction<String, SomePOJO, Tuple2<String, Long>> { ... } {code} The sae -- This message was sent by Atlassian JIRA (v6.3.4#6332)