Stephan Ewen created FLINK-1272:
-----------------------------------

             Summary: Add a "reduceWithKey" function
                 Key: FLINK-1272
                 URL: https://issues.apache.org/jira/browse/FLINK-1272
             Project: Flink
          Issue Type: New Feature
          Components: Java API, Scala API
            Reporter: Stephan Ewen


Flink does not assume a key/value model for grouping/aggregating/joining. The 
keys are specified as positions or paths of the objects to be grouped/joined.

Currently, we do not expose the key in the {{ReduceFunction}} and 
{{GroupReduceFunction}}, bit give (iterators over) the objects themselves.

Since it is a common case to access the key, I suggest to add a convenience 
function {{GroupReduceWithKey}} that has the following signature and can be 
called as follows:

{code}
public interface GroupReduceWithKeyFunction<KEY, IN, OUT> {

    void reduceGroup(KEY key, Iterable<IN> value, Collector<OUT> out);
}
{code}


Scala:
{code}
val  data : DataSet[SomePOJO] = ...
data
  .groupBy("id")
  .reduceGroup( (key, value, out : Collector[(String, Long)]) =>
                            out.collect( (key, values.minBy(_.timestamp) ) );
{code}

Java:
{code}
DataSet<SomePOJO> data = ...
data
  .groupBy("id")
  .reduceGroup(
      new GroupReduceWithKeyFunction<String, SomePOJO, Tuple2<String, Long>> {
          ...
      }
{code}


The sae 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to