Here's a ticket: https://issues.apache.org/jira/browse/SPARK-15598



On Fri, May 20, 2016 at 12:35 AM, Reynold Xin <r...@databricks.com> wrote:

> Andres - this is great feedback. Let me think about it a little bit more
> and reply later.
>
>
> On Thu, May 19, 2016 at 11:12 AM, Andres Perez <and...@tresata.com> wrote:
>
>> Hi all,
>>
>> We were in the process of porting an RDD program to one which uses
>> Datasets. Most things were easy to transition, but one hole in
>> functionality we found was the ability to reduce a Dataset by key,
>> something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding
>> the functionality ourselves involved creating a KeyValueGroupedDataset and
>> calling reduceGroups to get the reduced Dataset.
>>
>>   class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) {
>>     def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2:
>> Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] =
>>       ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1,
>> func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) }
>>   }
>>
>> Note that the functions passed into .reduceGroups takes in the key-value
>> pair. It'd be nicer to pass in a function that maps just the values, i.e.
>> reduceGroups(func). This would require the ability to modify the values of
>> the KeyValueGroupedDataset (which is returned by the .groupByKey call on a
>> Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V
>> => U)) does not currently exist.
>>
>> The more important issue, however, is the inefficiency of .reduceGroups.
>> The function does not support partial aggregation (reducing map-side), and
>> as a result requires shuffling all the data in the Dataset. A more
>> efficient alternative that that we explored involved creating a Dataset
>> from the KeyValueGroupedDataset by creating an Aggregator and passing it as
>> a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the
>> Aggregator necessitated the creation of a zero to create a valid monoid.
>> However, the zero is dependent on the reduce function. The zero for a
>> function such as addition on Ints would be different from the zero for
>> taking the minimum over Ints, for example. The Aggregator requires that we
>> not break the rule of reduce(a, zero) == a. To do this we had to create an
>> Aggregator with a buffer type that stores the value along with a null flag
>> (using Scala's nice Option syntax yielded some mysterious errors that I
>> haven't worked through yet, unfortunately), used by the zero element to
>> signal that it should not participate in the reduce function.
>>
>> -Andy
>>
>
>

Reply via email to