Hi Fabian,

I ran into a problem with your syntax example:

DataSet<Tuple2&lt;String, Integer>> ds = ... 
DataSet<Tuple4&lt;Tuple2&lt;String,Integer>,Integer, Integer, Long> result = 
ds.groupBy(0).min(1).andMax(1).andCnt(); 

Basically, in the example above we don't know how long the chain of
aggregation method calls is. Also, each aggregation method call adds a
field to the result tuple (the first call to groupBy returns a
Tuple1). Because the resultType of an operator is specified in the
constructur, every one of those method calls needs to create a new
Operator<OUT> with the correct result type. However, only the
translateToDataflow method of the last method call in the chain should
actually compute the aggregation.

This can be achieved by testing if an aggregation method is called on
an AggregationOperator. The translateToDataFlow method of the
operators in the start/middle of the chain would then just return a
MapOperatorBase which simply extends the tuple. The
translateToDataFlow method of the last operator in the chain would
return a GroupReduceOperatorBase.

This strategy seems very hackish and involves lots of unnecessary
copying of tuple data. I think a better way would be to use the
following syntax:

ds.groupBy(0).aggregate(min(1), max(1), cnt())

or

ds.groupBy(0).min(1).max(1).cnt(1).aggregate()

Here, there is only one method which creates a new operator, the
aggregate method, and the final resultType is known when aggregate is
called.

What do you think?

Best,
Viktor



--
View this message in context: 
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list 
archive at Nabble.com.

Reply via email to