Hi Viktor, I had a look at your branch. First of all, it looks like very good work! Good code quality, lots of tests, well documented, nice! I like the first approach (ds.aggregate(min(1), max(2), count()) much better than the other one. It basically shows how the result tuple is constructed.
I also have a few comments on the code and the overall approach: - I would split the branch into two branches, one for each approach. That make comparisons with master much easier. - I am not sure about the implicit adding of key fields if they are not explicitly added by the user in the aggregation. It might confuse users if the return type looks different from what they have specified. How about having an allKeys() function that adds all keys of the grouping and not adding keys by default? - DataSet and UnorderedGrouping expose getter and setter methods for the AggregationOperatorFactory. These methods are public and therefore facing the user API. Can you make them private or even remove them. They are not really necessary, right? - The aggregation GroupReduceFunction should be combinable to make it perform better, esp. in case of aggregations on ungrouped datasets. It would be even better, if you could convert the GroupReduceFunction into a functional-style ReduceFunction. These function are always combinable and can be executed using a hash-aggregation strategy once we have that feature implemented (again better performance). However, for that you would need to have a pre- and postprocessing MapFunctions (initialize and finalize of aggregates). On the other hand, you only need three aggregation functions sum, min, and max (count is sum of ones, avg is sum/count). This design also eases the sharing of count for multiple avg aggregations. - Some integration test cases would also be nice. See for example the tests in org.apache.flink.test.javaApiOperators.* - We do not use @author tags in our code. - Finally, we try to keep the documentation in sync with the code. Once your changes are ready for a PR, you should adapt the documenation in ./docs according to your changes (no need to do it at this point). Please let me know if you have any questions. Cheers, Fabian 2014-11-19 16:41 GMT+01:00 Viktor Rosenfeld <viktor.rosenf...@tu-berlin.de>: > Hi everybody, > > I've created a GitHub branch for the new aggregation code: > https://github.com/he-sk/incubator-flink/tree/aggregation > > I have implemented both of the APIs that I proposed earlier, so people can > play around and decide which they like better: > > DataSet ds = ... > ds.groupBy(0).aggregate(min(1), max(1), count()) > > And: > > DataSet ds = ... > ds.groupBy(0).min(1).max(1).count().aggregate() > > The second version is a thin layer on the first version. > > The aggregation functions min, max, sum, count, and average are supported. > For groupings, you can select the group keys with (multiple) key() > pseudo-aggregation functions. By default, all group keys are used. > > You can find examples in AggregationApi1Test.java and > AggregationApi2Test.java. > > Right now, only the Java API uses the new aggregation code. I've only > started learning Scala so I don't know how easy it will be to port the new > API. One problem that I foresee is that the type information of the input > tuples is lost. Therefore, the Scala compiler cannot do type inference on > the output tuple. I hope that this can be fixed or worked around by simple > specifying the output tuple type directly. > > I've kept the old aggregation API but marked it deprecated and renamed some > functions. > > The next steps would be: > > 1) Implement Scala API. > 2) Add support for POJOs (sync with streaming aggregations for that). > > Looking forward to your input. > > Best, > Viktor > > > > -- > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2547.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list > archive at Nabble.com. >