Hi Viktor,

I had a look at your branch.
First of all, it looks like very good work! Good code quality, lots of
tests, well documented, nice!
I like the first approach (ds.aggregate(min(1), max(2), count()) much
better than the other one. It basically shows how the result tuple is
constructed.

I also have a few comments on the code and the overall approach:
- I would split the branch into two branches, one for each approach. That
make comparisons with master much easier.
- I am not sure about the implicit adding of key fields if they are not
explicitly added by the user in the aggregation. It might confuse users if
the return type looks different from what they have specified. How about
having an allKeys() function that adds all keys of the grouping and not
adding keys by default?
- DataSet and UnorderedGrouping expose getter and setter methods for the
AggregationOperatorFactory. These methods are public and therefore facing
the user API. Can you make them private or even remove them. They are not
really necessary, right?
- The aggregation GroupReduceFunction should be combinable to make it
perform better, esp. in case of aggregations on ungrouped datasets. It
would be even better, if you could convert the GroupReduceFunction into a
functional-style ReduceFunction. These function are always combinable and
can be executed using a hash-aggregation strategy once we have that feature
implemented (again better performance). However, for that you would need to
have a pre- and postprocessing MapFunctions (initialize and finalize of
aggregates). On the other hand, you only need three aggregation functions
sum, min, and max (count is sum of ones, avg is sum/count). This design
also eases the sharing of count for multiple avg aggregations.
- Some integration test cases would also be nice. See for example the tests
in org.apache.flink.test.javaApiOperators.*
- We do not use @author tags in our code.
- Finally, we try to keep the documentation in sync with the code. Once
your changes are ready for a PR, you should adapt the documenation in
./docs according to your changes (no need to do it at this point).

Please let me know if you have any questions.

Cheers, Fabian




2014-11-19 16:41 GMT+01:00 Viktor Rosenfeld <viktor.rosenf...@tu-berlin.de>:

> Hi everybody,
>
> I've created a GitHub branch for the new aggregation code:
> https://github.com/he-sk/incubator-flink/tree/aggregation
>
> I have implemented both of the APIs that I proposed earlier, so people can
> play around and decide which they like better:
>
>   DataSet ds = ...
>   ds.groupBy(0).aggregate(min(1), max(1), count())
>
> And:
>
>   DataSet ds = ...
>   ds.groupBy(0).min(1).max(1).count().aggregate()
>
> The second version is a thin layer on the first version.
>
> The aggregation functions min, max, sum, count, and average are supported.
> For groupings, you can select the group keys with (multiple) key()
> pseudo-aggregation functions. By default, all group keys are used.
>
> You can find examples in AggregationApi1Test.java and
> AggregationApi2Test.java.
>
> Right now, only the Java API uses the new aggregation code. I've only
> started learning Scala so I don't know how easy it will be to port the new
> API. One problem that I foresee is that the type information of the input
> tuples is lost. Therefore, the Scala compiler cannot do type inference on
> the output tuple. I hope that this can be fixed or worked around by simple
> specifying the output tuple type directly.
>
> I've kept the old aggregation API but marked it deprecated and renamed some
> functions.
>
> The next steps would be:
>
> 1) Implement Scala API.
> 2) Add support for POJOs (sync with streaming aggregations for that).
>
> Looking forward to your input.
>
> Best,
> Viktor
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2547.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>

Reply via email to