Here's a ticket: https://issues.apache.org/jira/browse/SPARK-15598
On Fri, May 20, 2016 at 12:35 AM, Reynold Xin <r...@databricks.com> wrote: > Andres - this is great feedback. Let me think about it a little bit more > and reply later. > > > On Thu, May 19, 2016 at 11:12 AM, Andres Perez <and...@tresata.com> wrote: > >> Hi all, >> >> We were in the process of porting an RDD program to one which uses >> Datasets. Most things were easy to transition, but one hole in >> functionality we found was the ability to reduce a Dataset by key, >> something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding >> the functionality ourselves involved creating a KeyValueGroupedDataset and >> calling reduceGroups to get the reduced Dataset. >> >> class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) { >> def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2: >> Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] = >> ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1, >> func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) } >> } >> >> Note that the functions passed into .reduceGroups takes in the key-value >> pair. It'd be nicer to pass in a function that maps just the values, i.e. >> reduceGroups(func). This would require the ability to modify the values of >> the KeyValueGroupedDataset (which is returned by the .groupByKey call on a >> Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V >> => U)) does not currently exist. >> >> The more important issue, however, is the inefficiency of .reduceGroups. >> The function does not support partial aggregation (reducing map-side), and >> as a result requires shuffling all the data in the Dataset. A more >> efficient alternative that that we explored involved creating a Dataset >> from the KeyValueGroupedDataset by creating an Aggregator and passing it as >> a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the >> Aggregator necessitated the creation of a zero to create a valid monoid. >> However, the zero is dependent on the reduce function. The zero for a >> function such as addition on Ints would be different from the zero for >> taking the minimum over Ints, for example. The Aggregator requires that we >> not break the rule of reduce(a, zero) == a. To do this we had to create an >> Aggregator with a buffer type that stores the value along with a null flag >> (using Scala's nice Option syntax yielded some mysterious errors that I >> haven't worked through yet, unfortunately), used by the zero element to >> signal that it should not participate in the reduce function. >> >> -Andy >> > >