Hi Viktor, thanks for the update!
Regarding the explicit vs implicit adding of key fields: I would only allow to use either key(x) or allKeys() and throw an exception if they are mixed. I guess there won't be many situations, where somebody would want to mix them anyway. No need to complicate the logic and semantics for this corner case, IMO. Not sharing the result of two identical operations is fine. Whoever computes ds.aggregate(min(0), min(0)) deserves the overhead ;-) We have the goal to keep the Java and Scala APIs in sync at any point in time. You can make a pull request only with the Java changes, but it won't be merged until you (or somebody else) adapted the Scala API. I would say, do the PR and start a discussion about that. This way, everybody can review the code more easily. I'll have a detailed look at the changes later. Cheers, Fabian 2014-11-27 11:20 GMT+01:00 Viktor Rosenfeld <viktor.rosenf...@tu-berlin.de>: > Hi Fabian, > > thanks for your feedback. See my responses below. > > > Fabian Hueske wrote > > - I would split the branch into two branches, one for each approach. That > > make comparisons with master much easier. > > I've moved the changes necessary for the second approach to a branch called > aggregation-alt: > https://github.com/he-sk/incubator-flink/tree/aggregation-alt > > > Fabian Hueske wrote > > - I am not sure about the implicit adding of key fields if they are not > > explicitly added by the user in the aggregation. It might confuse users > if > > the return type looks different from what they have specified. How about > > having an allKeys() function that adds all keys of the grouping and not > > adding keys by default? > > Done. But I'm not sure about it. > > It is not very clear where in the result the key fields should be added. > The > old code added them at the beginning. I'm now inserting them at the > position > where the allKeys() function is called except for those keys that are > explicitly specified elsewhere. All in all, I think that the semantics are > very > opaque. > > > Fabian Hueske wrote > > - DataSet and UnorderedGrouping expose getter and setter methods for the > > AggregationOperatorFactory. These methods are public and therefore facing > > the user API. Can you make them private or even remove them. They are not > > really necessary, right? > > I need the setter to test the delegation in DataSet.aggregate(). The test > is > fairly trivial but now that it's there, why remove it? I've made the > getters > and setters package private. > > > Fabian Hueske wrote > > - The aggregation GroupReduceFunction should be combinable to make it > > perform better, esp. in case of aggregations on ungrouped datasets. It > > would be even better, if you could convert the GroupReduceFunction into a > > functional-style ReduceFunction. These function are always combinable and > > can be executed using a hash-aggregation strategy once we have that > > feature > > implemented (again better performance). However, for that you would need > > to > > have a pre- and postprocessing MapFunctions (initialize and finalize of > > aggregates). On the other hand, you only need three aggregation functions > > sum, min, and max (count is sum of ones, avg is sum/count). This design > > also eases the sharing of count for multiple avg aggregations. > > The GroupReduce cannot be made combinable because it changes the output > tuple > type. CombineFunction.combine() requires that both the input and the output > type are the same. > > I changed the implementation to use 2 MapFunctions and a ReduceFunction. > > Also, I implemented average so that it picks up an existing count and sum. > However, if the same function is specified multiple times (e.g., 2 calls to > min(0) in one aggregate) it won't be reused. The reason is that every > function > stores only one position of the result in the output tuple. (But two > average(0) > functions will use the same count and sum functions because the result of > count > and sum is not represented in the output tuple.) > > > Fabian Hueske wrote > > - Some integration test cases would also be nice. See for example the > > tests > > in org.apache.flink.test.javaApiOperators.* > > I've copied the tests in AggregateITCase and SumMinMaxITCase for that. > > > Fabian Hueske wrote > > - We do not use @author tags in our code. > > Removed. > > > Fabian Hueske wrote > > - Finally, we try to keep the documentation in sync with the code. Once > > your changes are ready for a PR, you should adapt the documenation in > > ./docs according to your changes (no need to do it at this point). > > > > Please let me know if you have any questions. > > Do you think that for a pull request the implementation of the Scala API is > necessary? Or should I create a pull request from the current code? > > Cheers, > Viktor > > > > > -- > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html > Sent from the Apache Flink (Incubator) Mailing List archive. mailing list > archive at Nabble.com. >