Hi Fabian,

thanks for your feedback. See my responses below.


Fabian Hueske wrote
> - I would split the branch into two branches, one for each approach. That
> make comparisons with master much easier.

I've moved the changes necessary for the second approach to a branch called
aggregation-alt:
https://github.com/he-sk/incubator-flink/tree/aggregation-alt


Fabian Hueske wrote
> - I am not sure about the implicit adding of key fields if they are not
> explicitly added by the user in the aggregation. It might confuse users if
> the return type looks different from what they have specified. How about
> having an allKeys() function that adds all keys of the grouping and not
> adding keys by default?

Done. But I'm not sure about it. 

It is not very clear where in the result the key fields should be added. The
old code added them at the beginning. I'm now inserting them at the position
where the allKeys() function is called except for those keys that are
explicitly specified elsewhere. All in all, I think that the semantics are
very
opaque. 


Fabian Hueske wrote
> - DataSet and UnorderedGrouping expose getter and setter methods for the
> AggregationOperatorFactory. These methods are public and therefore facing
> the user API. Can you make them private or even remove them. They are not
> really necessary, right?

I need the setter to test the delegation in DataSet.aggregate(). The test is
fairly trivial but now that it's there, why remove it? I've made the getters
and setters package private.


Fabian Hueske wrote
> - The aggregation GroupReduceFunction should be combinable to make it
> perform better, esp. in case of aggregations on ungrouped datasets. It
> would be even better, if you could convert the GroupReduceFunction into a
> functional-style ReduceFunction. These function are always combinable and
> can be executed using a hash-aggregation strategy once we have that
> feature
> implemented (again better performance). However, for that you would need
> to
> have a pre- and postprocessing MapFunctions (initialize and finalize of
> aggregates). On the other hand, you only need three aggregation functions
> sum, min, and max (count is sum of ones, avg is sum/count). This design
> also eases the sharing of count for multiple avg aggregations.

The GroupReduce cannot be made combinable because it changes the output
tuple
type. CombineFunction.combine() requires that both the input and the output
type are the same.

I changed the implementation to use 2 MapFunctions and a ReduceFunction.

Also, I implemented average so that it picks up an existing count and sum.
However, if the same function is specified multiple times (e.g., 2 calls to
min(0) in one aggregate) it won't be reused. The reason is that every
function
stores only one position of the result in the output tuple. (But two
average(0)
functions will use the same count and sum functions because the result of
count
and sum is not represented in the output tuple.)


Fabian Hueske wrote
> - Some integration test cases would also be nice. See for example the
> tests
> in org.apache.flink.test.javaApiOperators.*

I've copied the tests in AggregateITCase and SumMinMaxITCase for that.


Fabian Hueske wrote
> - We do not use @author tags in our code.

Removed.


Fabian Hueske wrote
> - Finally, we try to keep the documentation in sync with the code. Once
> your changes are ready for a PR, you should adapt the documenation in
> ./docs according to your changes (no need to do it at this point).
> 
> Please let me know if you have any questions.

Do you think that for a pull request the implementation of the Scala API is
necessary? Or should I create a pull request from the current code?

Cheers,
Viktor




--
View this message in context: 
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list 
archive at Nabble.com.

Reply via email to