Re: GroupedDataset needs a mapValues

2016-02-14 Thread Koert Kuipers
great, by adding a little implicit wrapper i can use algebird's
MonoidAggregator, which gives me the equivalent of GroupedDataset.mapValues
(by using Aggregator.composePrepare)

i am a little surprised you require a monoid and not just a semiring. but
probably the right choice given possibly empty datasets.

i do seem the be running into SPARK-12696
 for some aggregators so
will wait for spark 1.6.1

also i am having no luck using the aggregators with DataFrame instead of
Dataset. for example:

lazy val ds1 = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3))).toDS
ds1.toDF.groupBy($"_1").agg(Aggregator.toList[(String, Int)]).show

gives me:
[info]   org.apache.spark.sql.AnalysisException: unresolved operator
'Aggregate [_1#50],
[_1#50,(AggregatorAdapter(),mode=Complete,isDistinct=false) AS
AggregatorAdapter()#61];
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
[info]   at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:203)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
[info]   at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
[info]   at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
[info]   at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
[info]   at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
[info]   at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
[info]   at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)

my motivation for trying the DataFrame version is that it takes in
unlimited aggregators with:
GroupedData.def agg(expr: Column, exprs: Column*): DataFrame
there is no equivalent in GroupedDataset.


On Sun, Feb 14, 2016 at 12:31 AM, Michael Armbrust 
wrote:

> Instead of grouping with a lambda function, you can do it with a column
> expression to avoid materializing an unnecessary tuple:
>
> df.groupBy($"_1")
>
> Regarding the mapValues, you can do something similar using an Aggregator
> ,
> but I agree that we should consider something lighter weight like the
> mapValues you propose.
>
> On Sat, Feb 13, 2016 at 1:35 PM, Koert Kuipers  wrote:
>
>> i have a Dataset[(K, V)]
>> i would like to group by k and then reduce V using a function (V, V) => V
>> how do i do this?
>>
>> i would expect something like:
>> val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f)
>> or better:
>> ds.grouped.reduce(f)  # grouped only works on Dataset[(_, _)] and i dont
>> care about java api
>>
>> but there is no mapValues or grouped. ds.groupBy(_._1) gives me a
>> GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key
>> through the reduce operation but that seems ugly and inefficient.
>>
>> any thoughts?
>>
>>
>>
>


Re: GroupedDataset needs a mapValues

2016-02-14 Thread Andy Davidson
Hi Michael

From:  Michael Armbrust <mich...@databricks.com>
Date:  Saturday, February 13, 2016 at 9:31 PM
To:  Koert Kuipers <ko...@tresata.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: GroupedDataset needs a mapValues

> Instead of grouping with a lambda function, you can do it with a column
> expression to avoid materializing an unnecessary tuple:
> 
> df.groupBy($"_1")


I am unfamiliar with this notation. Is there something similar for Java and
python?

Kind regards

Andy


> 
> Regarding the mapValues, you can do something similar using an Aggregator
> <https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%
> 20Aggregator.html> , but I agree that we should consider something lighter
> weight like the mapValues you propose.
> 
> On Sat, Feb 13, 2016 at 1:35 PM, Koert Kuipers <ko...@tresata.com> wrote:
>> i have a Dataset[(K, V)]
>> i would like to group by k and then reduce V using a function (V, V) => V
>> how do i do this?
>> 
>> i would expect something like:
>> val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f)
>> or better:
>> ds.grouped.reduce(f)  # grouped only works on Dataset[(_, _)] and i dont care
>> about java api
>> 
>> but there is no mapValues or grouped. ds.groupBy(_._1) gives me a
>> GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key
>> through the reduce operation but that seems ugly and inefficient.
>> 
>> any thoughts?
>> 
>> 
> 




GroupedDataset needs a mapValues

2016-02-13 Thread Koert Kuipers
i have a Dataset[(K, V)]
i would like to group by k and then reduce V using a function (V, V) => V
how do i do this?

i would expect something like:
val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f)
or better:
ds.grouped.reduce(f)  # grouped only works on Dataset[(_, _)] and i dont
care about java api

but there is no mapValues or grouped. ds.groupBy(_._1) gives me a
GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key
through the reduce operation but that seems ugly and inefficient.

any thoughts?


Re: GroupedDataset needs a mapValues

2016-02-13 Thread Michael Armbrust
Instead of grouping with a lambda function, you can do it with a column
expression to avoid materializing an unnecessary tuple:

df.groupBy($"_1")

Regarding the mapValues, you can do something similar using an Aggregator
,
but I agree that we should consider something lighter weight like the
mapValues you propose.

On Sat, Feb 13, 2016 at 1:35 PM, Koert Kuipers  wrote:

> i have a Dataset[(K, V)]
> i would like to group by k and then reduce V using a function (V, V) => V
> how do i do this?
>
> i would expect something like:
> val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f)
> or better:
> ds.grouped.reduce(f)  # grouped only works on Dataset[(_, _)] and i dont
> care about java api
>
> but there is no mapValues or grouped. ds.groupBy(_._1) gives me a
> GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key
> through the reduce operation but that seems ugly and inefficient.
>
> any thoughts?
>
>
>


Re: GroupedDataset needs a mapValues

2016-02-13 Thread Koert Kuipers
thanks i will look into Aggregator as well

On Sun, Feb 14, 2016 at 12:31 AM, Michael Armbrust 
wrote:

> Instead of grouping with a lambda function, you can do it with a column
> expression to avoid materializing an unnecessary tuple:
>
> df.groupBy($"_1")
>
> Regarding the mapValues, you can do something similar using an Aggregator
> ,
> but I agree that we should consider something lighter weight like the
> mapValues you propose.
>
> On Sat, Feb 13, 2016 at 1:35 PM, Koert Kuipers  wrote:
>
>> i have a Dataset[(K, V)]
>> i would like to group by k and then reduce V using a function (V, V) => V
>> how do i do this?
>>
>> i would expect something like:
>> val ds = Dataset[(K, V)] ds.groupBy(_._1).mapValues(_._2).reduce(f)
>> or better:
>> ds.grouped.reduce(f)  # grouped only works on Dataset[(_, _)] and i dont
>> care about java api
>>
>> but there is no mapValues or grouped. ds.groupBy(_._1) gives me a
>> GroupedDataset[(K, (K, V))] which is inconvenient. i could carry the key
>> through the reduce operation but that seems ugly and inefficient.
>>
>> any thoughts?
>>
>>
>>
>