Viktor said he changed the implementation to MapFunction -> ReduceFunction -> MapFunction.
So it is combinable :-) 2014-11-27 11:45 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > why does the GroupReduce change the output type? Can this not be done > in the two mappers? In my opinion, aggregations should be combinable, > otherwise, performance would be severely crippled. > > Cheers, > Aljoscha > > On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld > <viktor.rosenf...@tu-berlin.de> wrote: > > Hi Fabian, > > > > thanks for your feedback. See my responses below. > > > > > > Fabian Hueske wrote > >> - I would split the branch into two branches, one for each approach. > That > >> make comparisons with master much easier. > > > > I've moved the changes necessary for the second approach to a branch > called > > aggregation-alt: > > https://github.com/he-sk/incubator-flink/tree/aggregation-alt > > > > > > Fabian Hueske wrote > >> - I am not sure about the implicit adding of key fields if they are not > >> explicitly added by the user in the aggregation. It might confuse users > if > >> the return type looks different from what they have specified. How about > >> having an allKeys() function that adds all keys of the grouping and not > >> adding keys by default? > > > > Done. But I'm not sure about it. > > > > It is not very clear where in the result the key fields should be added. > The > > old code added them at the beginning. I'm now inserting them at the > position > > where the allKeys() function is called except for those keys that are > > explicitly specified elsewhere. All in all, I think that the semantics > are > > very > > opaque. > > > > > > Fabian Hueske wrote > >> - DataSet and UnorderedGrouping expose getter and setter methods for the > >> AggregationOperatorFactory. These methods are public and therefore > facing > >> the user API. Can you make them private or even remove them. They are > not > >> really necessary, right? > > > > I need the setter to test the delegation in DataSet.aggregate(). The > test is > > fairly trivial but now that it's there, why remove it? I've made the > getters > > and setters package private. > > > > > > Fabian Hueske wrote > >> - The aggregation GroupReduceFunction should be combinable to make it > >> perform better, esp. in case of aggregations on ungrouped datasets. It > >> would be even better, if you could convert the GroupReduceFunction into > a > >> functional-style ReduceFunction. These function are always combinable > and > >> can be executed using a hash-aggregation strategy once we have that > >> feature > >> implemented (again better performance). However, for that you would need > >> to > >> have a pre- and postprocessing MapFunctions (initialize and finalize of > >> aggregates). On the other hand, you only need three aggregation > functions > >> sum, min, and max (count is sum of ones, avg is sum/count). This design > >> also eases the sharing of count for multiple avg aggregations. > > > > The GroupReduce cannot be made combinable because it changes the output > > tuple > > type. CombineFunction.combine() requires that both the input and the > output > > type are the same. > > > > I changed the implementation to use 2 MapFunctions and a ReduceFunction. > > > > Also, I implemented average so that it picks up an existing count and > sum. > > However, if the same function is specified multiple times (e.g., 2 calls > to > > min(0) in one aggregate) it won't be reused. The reason is that every > > function > > stores only one position of the result in the output tuple. (But two > > average(0) > > functions will use the same count and sum functions because the result of > > count > > and sum is not represented in the output tuple.) > > > > > > Fabian Hueske wrote > >> - Some integration test cases would also be nice. See for example the > >> tests > >> in org.apache.flink.test.javaApiOperators.* > > > > I've copied the tests in AggregateITCase and SumMinMaxITCase for that. > > > > > > Fabian Hueske wrote > >> - We do not use @author tags in our code. > > > > Removed. > > > > > > Fabian Hueske wrote > >> - Finally, we try to keep the documentation in sync with the code. Once > >> your changes are ready for a PR, you should adapt the documenation in > >> ./docs according to your changes (no need to do it at this point). > >> > >> Please let me know if you have any questions. > > > > Do you think that for a pull request the implementation of the Scala API > is > > necessary? Or should I create a pull request from the current code? > > > > Cheers, > > Viktor > > > > > > > > > > -- > > View this message in context: > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html > > Sent from the Apache Flink (Incubator) Mailing List archive. mailing > list archive at Nabble.com. >