Chesnay is right. Right now, it is not possible to do want you want in a straightforward way because Flink does not support to fully sort a data set (there are several related issues in JIRA).
A workaround would be to attach a constant value to each tuple, group on that (all tuples are sent to the same group), sort that group, and apply the first operator. 2015-01-21 20:22 GMT+01:00 Chesnay Schepler <chesnay.schep...@fu-berlin.de>: > If i remember correctly first() returns the first n values for every > group. the javadocs actually don't make this behaviour very clear. > > > On 21.01.2015 19:18, Felix Neutatz wrote: > >> Hi, >> >> my use case is the following: >> >> I have a Tuple2<String,Long>. I want to group by the String and sum up the >> Long values accordingly. This works fine with these lines: >> >> DataSet<Lineitem> lineitems = getLineitemDataSet(env); >> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> 1); >> >> After the aggregation I want to print the 10 groups with the highest sum, >> like: >> >> string1, 100L >> string2, 50L >> string3, 1L >> >> I tried that: >> >> lineitems.project(new int []{3,0}).groupBy(0).aggregate(Aggregations.SUM, >> 1).groupBy(0).sortGroup(1, Order.DESCENDING).first(3).print(); >> >> But instead of 3 records, I get a lot more. >> >> Can see my error? >> >> Best regards, >> >> Felix >> >> >