Hi Fabian, I ran into a problem with your syntax example:
DataSet<Tuple2<String, Integer>> ds = ... DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result = ds.groupBy(0).min(1).andMax(1).andCnt(); Basically, in the example above we don't know how long the chain of aggregation method calls is. Also, each aggregation method call adds a field to the result tuple (the first call to groupBy returns a Tuple1). Because the resultType of an operator is specified in the constructur, every one of those method calls needs to create a new Operator<OUT> with the correct result type. However, only the translateToDataflow method of the last method call in the chain should actually compute the aggregation. This can be achieved by testing if an aggregation method is called on an AggregationOperator. The translateToDataFlow method of the operators in the start/middle of the chain would then just return a MapOperatorBase which simply extends the tuple. The translateToDataFlow method of the last operator in the chain would return a GroupReduceOperatorBase. This strategy seems very hackish and involves lots of unnecessary copying of tuple data. I think a better way would be to use the following syntax: ds.groupBy(0).aggregate(min(1), max(1), cnt()) or ds.groupBy(0).min(1).max(1).cnt(1).aggregate() Here, there is only one method which creates a new operator, the aggregate method, and the final resultType is known when aggregate is called. What do you think? Best, Viktor -- View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.