Data Skew in Dataframe Groupby - Any suggestions?

2017-04-27 Thread KhajaAsmath Mohammed
Hi,

I am working on requirement where I need to perform groupby on set of data
and find the max value on that group.

GroupBy on dataframe is resulting in skewness and job is running for quite
a long time (actually more time than in Hive and Impala for one day worth
of data).

Any suggestions on how to overcome this?

dataframe.groupBy(Constants.Datapoint.Vin,Constants.Datapoint.Utctime,Constants.Datapoint.ProviderDesc,Constants.Datapoint.Latitude,Constants.Datapoint.Longitude)

*Note: *I have added colleace and persited data into memory and disk too
still no improvement

Thanks,
Asmath.


RE: forcing dataframe groupby partitioning

2017-01-29 Thread Mendelson, Assaf
Could you explain why this would work?
Assaf.

From: Haviv, Daniel [mailto:dha...@amazon.com]
Sent: Sunday, January 29, 2017 7:09 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: forcing dataframe groupby partitioning

If there's no built in local groupBy, You could do something like that:
df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg

Thank you.
Daniel

On 29 Jan 2017, at 18:33, Mendelson, Assaf 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


Re: forcing dataframe groupby partitioning

2017-01-29 Thread Haviv, Daniel
If there's no built in local groupBy, You could do something like that:
df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg

Thank you.
Daniel

On 29 Jan 2017, at 18:33, Mendelson, Assaf 
> wrote:

Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


forcing dataframe groupby partitioning

2017-01-29 Thread Mendelson, Assaf
Hi,

Consider the following example:

df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg)

The default way spark would behave would be to shuffle according to a 
combination of C1 and C2 and then shuffle again by C1 only.
This behavior makes sense when one uses C2 to salt C1 for skewed data, however, 
when this is part of the logic the cost of doing two shuffles instead of one is 
not insignificant (even worse when multiple iterations are done).

Is there a way to force the first groupby to shuffle the data so that the 
second groupby would occur within the partition?

Thanks,
Assaf.


How to change Spark DataFrame groupby("col1",..,"coln") into reduceByKey()?

2016-05-22 Thread unk1102
Hi I have Spark job which does group by and I cant avoid it because of my use
case. I have large dataset around 1 TB which I need to process/update in
DataFrame. Now my jobs shuffles huge data and slows things because of
shuffling and groupby. One reason I see is my data is skew some of my group
by keys are empty. How do I avoid empty group by keys in DataFrame? Does
DataFrame avoid empty group by key? I have around 8 keys on which I do group
by. 

sourceFrame.select("blabla").groupby("col1","col2","col3",..."col8").agg("bla
bla");

How do I change above code into using reduceByKey() can we apply aggregation
on reduceByKey()? Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-change-Spark-DataFrame-groupby-col1-coln-into-reduceByKey-tp26998.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark - how to use UDFs with dataframe groupby

2016-02-10 Thread Davies Liu
short answer: PySpark does not support UDAF (user defined aggregate
function) for now.

On Tue, Feb 9, 2016 at 11:44 PM, Viktor ARDELEAN <viktor0...@gmail.com>
wrote:

> Hello,
>
> I am using following transformations on RDD:
>
> rddAgg = df.map(lambda l: (Row(a = l.a, b= l.b, c = l.c), l))\
>.aggregateByKey([], lambda accumulatorList, value: accumulatorList 
> + [value], lambda list1, list2: [list1] + [list2])
>
> I want to use the dataframe groupBy + agg transformation instead of map + 
> aggregateByKey because as far as I know dataframe transformations are faster 
> than RDD transformations.
>
> I just can't figure out how to use custom aggregate functions with agg.
>
> *First step is clear:*
>
> groupedData = df.groupBy("a","b","c")
>
> *Second step is not very clear to me:*
>
> dfAgg = groupedData.agg( a list and merges it?>)
>
> The agg documentations says the following:
> agg(**exprs*)
> <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.GroupedData.agg>
>
> Compute aggregates and returns the result as a DataFrame
> <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.DataFrame>
> .
>
> The available aggregate functions are avg, max, min, sum, count.
>
> If exprs is a single dict mapping from string to string, then the key is
> the column to perform aggregation on, and the value is the aggregate
> function.
>
> Alternatively, exprs can also be a list of aggregate Column
> <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column>
>  expressions.
> Parameters: *exprs* – a dict mapping from column name (string) to
> aggregate functions (string), or a list of Column
> <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column>
> .
>
> Thanks for help!
> --
> Viktor
>
> *P*   Don't print this email, unless it's really necessary. Take care of
> the environment.
>


Pyspark - how to use UDFs with dataframe groupby

2016-02-09 Thread Viktor ARDELEAN
Hello,

I am using following transformations on RDD:

rddAgg = df.map(lambda l: (Row(a = l.a, b= l.b, c = l.c), l))\
   .aggregateByKey([], lambda accumulatorList, value:
accumulatorList + [value], lambda list1, list2: [list1] + [list2])

I want to use the dataframe groupBy + agg transformation instead of
map + aggregateByKey because as far as I know dataframe
transformations are faster than RDD transformations.

I just can't figure out how to use custom aggregate functions with agg.

*First step is clear:*

groupedData = df.groupBy("a","b","c")

*Second step is not very clear to me:*

dfAgg = groupedData.agg()

The agg documentations says the following:
agg(**exprs*)
<https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.GroupedData.agg>

Compute aggregates and returns the result as a DataFrame
<https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.DataFrame>
.

The available aggregate functions are avg, max, min, sum, count.

If exprs is a single dict mapping from string to string, then the key is
the column to perform aggregation on, and the value is the aggregate
function.

Alternatively, exprs can also be a list of aggregate Column
<https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column>
 expressions.
Parameters: *exprs* – a dict mapping from column name (string) to aggregate
functions (string), or a list of Column
<https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column>
.

Thanks for help!
-- 
Viktor

*P*   Don't print this email, unless it's really necessary. Take care of
the environment.


Re: How to ignore case in dataframe groupby?

2015-12-30 Thread Eran Witkon
Drop the original column and rename the new column
See df.drop & df.withcolimnrenamed
Eran
On Wed, 30 Dec 2015 at 19:08 raja kbv  wrote:

> Solutions from Eran & Yanbo are working well. Thank you.
>
> @Eran,
>
> Your solution worked with a small change.
> DF.withColumn("upper-code",upper(df("countrycode"))).
>
> This creates a new column "upper-code". Is there a way to update the
> column or create a new df with update column?
>
> Thanks,
> Raja
>
> On Thursday, 24 December 2015 6:17 PM, Eran Witkon 
> wrote:
>
>
> Use DF.withColumn("upper-code",df("countrycode).toUpper))
> or just run a map function that does the same
>
> On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
> wrote:
>
> Hi,
> Values in a dataframe column named countrycode are in different cases. Eg:
> (US, us).  groupBy & count gives two rows but the requirement is to ignore
> case for this operation.
> 1) Is there a way to ignore case in groupBy? Or
> 2) Is there a way to update the dataframe column countrycode to uppercase?
>
> Thanks in advance.
>
> Regards,
> Raja
>
>
>
>


Re: How to ignore case in dataframe groupby?

2015-12-30 Thread raja kbv
Solutions from Eran & Yanbo are working well. Thank you. 
@Eran,
Your solution worked with a small change. 
DF.withColumn("upper-code",upper(df("countrycode"))). 

This creates a new column "upper-code". Is there a way to update the column or 
create a new df with update column? 

Thanks,Raja

On Thursday, 24 December 2015 6:17 PM, Eran Witkon  
wrote:
 

 Use DF.withColumn("upper-code",df("countrycode).toUpper))or just run a map 
function that does the same
On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja  wrote:

Hi,
Values in a dataframe column named countrycode are in different cases. Eg: (US, 
us).  groupBy & count gives two rows but the requirement is to ignore case for 
this operation.
1) Is there a way to ignore case in groupBy? Or
2) Is there a way to update the dataframe column countrycode to uppercase?

Thanks in advance.

Regards,
Raja


  

Re: How to ignore case in dataframe groupby?

2015-12-24 Thread Yanbo Liang
You can use DF.groupBy(upper(col("a"))).agg(sum(col("b"))).
DataFrame provide function "upper" to update column to uppercase.

2015-12-24 20:47 GMT+08:00 Eran Witkon :

> Use DF.withColumn("upper-code",df("countrycode).toUpper))
> or just run a map function that does the same
>
> On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
> wrote:
>
>> Hi,
>> Values in a dataframe column named countrycode are in different cases.
>> Eg: (US, us).  groupBy & count gives two rows but the requirement is to
>> ignore case for this operation.
>> 1) Is there a way to ignore case in groupBy? Or
>> 2) Is there a way to update the dataframe column countrycode to uppercase?
>>
>> Thanks in advance.
>>
>> Regards,
>> Raja
>>
>


How to ignore case in dataframe groupby?

2015-12-24 Thread Bharathi Raja
Hi,
Values in a dataframe column named countrycode are in different cases. Eg: (US, 
us).  groupBy & count gives two rows but the requirement is to ignore case for 
this operation.
1) Is there a way to ignore case in groupBy? Or
2) Is there a way to update the dataframe column countrycode to uppercase?

Thanks in advance.

Regards,
Raja

Re: How to ignore case in dataframe groupby?

2015-12-24 Thread Eran Witkon
Use DF.withColumn("upper-code",df("countrycode).toUpper))
or just run a map function that does the same

On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
wrote:

> Hi,
> Values in a dataframe column named countrycode are in different cases. Eg:
> (US, us).  groupBy & count gives two rows but the requirement is to ignore
> case for this operation.
> 1) Is there a way to ignore case in groupBy? Or
> 2) Is there a way to update the dataframe column countrycode to uppercase?
>
> Thanks in advance.
>
> Regards,
> Raja
>


Re: Spark DataFrame GroupBy into List

2015-10-14 Thread Deenar Toraskar
collect_set and collect_list are built-in User Defined functions see
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF

On 14 October 2015 at 03:45, SLiZn Liu  wrote:

> Hi Michael,
>
> Can you be more specific on `collect_set`? Is it a built-in function or,
> if it is an UDF, how it is defined?
>
> BR,
> Todd Leo
>
> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust 
> wrote:
>
>> import org.apache.spark.sql.functions._
>>
>> df.groupBy("category")
>>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>>
>> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu 
>> wrote:
>>
>>> Hey Spark users,
>>>
>>> I'm trying to group by a dataframe, by appending occurrences into a list
>>> instead of count.
>>>
>>> Let's say we have a dataframe as shown below:
>>>
>>> | category | id |
>>> |  |:--:|
>>> | A| 1  |
>>> | A| 2  |
>>> | B| 3  |
>>> | B| 4  |
>>> | C| 5  |
>>>
>>> ideally, after some magic group by (reverse explode?):
>>>
>>> | category | id_list  |
>>> |  |  |
>>> | A| 1,2  |
>>> | B| 3,4  |
>>> | C| 5|
>>>
>>> any tricks to achieve that? Scala Spark API is preferred. =D
>>>
>>> BR,
>>> Todd Leo
>>>
>>>
>>>
>>>
>>


RE: Spark DataFrame GroupBy into List

2015-10-14 Thread java8964
My guess is the same as UDAF of (collect_set) in Hive.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
Yong

From: sliznmail...@gmail.com
Date: Wed, 14 Oct 2015 02:45:48 +
Subject: Re: Spark DataFrame GroupBy into List
To: mich...@databricks.com
CC: user@spark.apache.org

Hi Michael, 
Can you be more specific on `collect_set`? Is it a built-in function or, if it 
is an UDF, how it is defined?
BR,Todd Leo
On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com> wrote:
import org.apache.spark.sql.functions._
df.groupBy("category")  .agg(callUDF("collect_set", df("id")).as("id_list"))
On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com> wrote:
Hey Spark users,
I'm trying to group by a dataframe, by appending occurrences into a list 
instead of count. 
Let's say we have a dataframe as shown below:| category | id |
|  |:--:|
| A| 1  |
| A| 2  |
| B| 3  |
| B| 4  |
| C| 5  |
ideally, after some magic group by (reverse explode?):| category | id_list  |
|  |  |
| A| 1,2  |
| B| 3,4  |
| C| 5|
any tricks to achieve that? Scala Spark API is preferred. =D
BR,Todd Leo 





  

Re: Spark DataFrame GroupBy into List

2015-10-14 Thread Michael Armbrust
Thats correct.  It is a Hive UDAF.

On Wed, Oct 14, 2015 at 6:45 AM, java8964 <java8...@hotmail.com> wrote:

> My guess is the same as UDAF of (collect_set) in Hive.
>
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
>
> Yong
>
> --
> From: sliznmail...@gmail.com
> Date: Wed, 14 Oct 2015 02:45:48 +
> Subject: Re: Spark DataFrame GroupBy into List
> To: mich...@databricks.com
> CC: user@spark.apache.org
>
>
> Hi Michael,
>
> Can you be more specific on `collect_set`? Is it a built-in function or,
> if it is an UDF, how it is defined?
>
> BR,
> Todd Leo
>
> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com>
> wrote:
>
> import org.apache.spark.sql.functions._
>
> df.groupBy("category")
>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>
> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com>
> wrote:
>
> Hey Spark users,
>
> I'm trying to group by a dataframe, by appending occurrences into a list
> instead of count.
>
> Let's say we have a dataframe as shown below:
>
> | category | id |
> |  |:--:|
> | A| 1  |
> | A| 2  |
> | B| 3  |
> | B| 4  |
> | C| 5  |
>
> ideally, after some magic group by (reverse explode?):
>
> | category | id_list  |
> |  |  |
> | A| 1,2  |
> | B| 3,4  |
> | C| 5|
>
> any tricks to achieve that? Scala Spark API is preferred. =D
>
> BR,
> Todd Leo
>
>
>
>
>


Re: Spark DataFrame GroupBy into List

2015-10-14 Thread SLiZn Liu
Thanks, Michael and java8964!

Does Hive Context also provides udf for combining existing lists, into
flattened(not nested) list? (list->list of lists -[flatten]->list).

On Thu, Oct 15, 2015 at 1:16 AM Michael Armbrust <mich...@databricks.com>
wrote:

> Thats correct.  It is a Hive UDAF.
>
> On Wed, Oct 14, 2015 at 6:45 AM, java8964 <java8...@hotmail.com> wrote:
>
>> My guess is the same as UDAF of (collect_set) in Hive.
>>
>>
>> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
>>
>> Yong
>>
>> --
>> From: sliznmail...@gmail.com
>> Date: Wed, 14 Oct 2015 02:45:48 +
>> Subject: Re: Spark DataFrame GroupBy into List
>> To: mich...@databricks.com
>> CC: user@spark.apache.org
>>
>>
>> Hi Michael,
>>
>> Can you be more specific on `collect_set`? Is it a built-in function or,
>> if it is an UDF, how it is defined?
>>
>> BR,
>> Todd Leo
>>
>> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>> import org.apache.spark.sql.functions._
>>
>> df.groupBy("category")
>>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>>
>> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com>
>> wrote:
>>
>> Hey Spark users,
>>
>> I'm trying to group by a dataframe, by appending occurrences into a list
>> instead of count.
>>
>> Let's say we have a dataframe as shown below:
>>
>> | category | id |
>> |  |:--:|
>> | A| 1  |
>> | A| 2  |
>> | B| 3  |
>> | B| 4  |
>> | C| 5  |
>>
>> ideally, after some magic group by (reverse explode?):
>>
>> | category | id_list  |
>> |  |  |
>> | A| 1,2  |
>> | B| 3,4  |
>> | C| 5|
>>
>> any tricks to achieve that? Scala Spark API is preferred. =D
>>
>> BR,
>> Todd Leo
>>
>>
>>
>>
>>
>


Re: Spark DataFrame GroupBy into List

2015-10-13 Thread SLiZn Liu
Hi Michael,

Can you be more specific on `collect_set`? Is it a built-in function or, if
it is an UDF, how it is defined?

BR,
Todd Leo

On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust 
wrote:

> import org.apache.spark.sql.functions._
>
> df.groupBy("category")
>   .agg(callUDF("collect_set", df("id")).as("id_list"))
>
> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu 
> wrote:
>
>> Hey Spark users,
>>
>> I'm trying to group by a dataframe, by appending occurrences into a list
>> instead of count.
>>
>> Let's say we have a dataframe as shown below:
>>
>> | category | id |
>> |  |:--:|
>> | A| 1  |
>> | A| 2  |
>> | B| 3  |
>> | B| 4  |
>> | C| 5  |
>>
>> ideally, after some magic group by (reverse explode?):
>>
>> | category | id_list  |
>> |  |  |
>> | A| 1,2  |
>> | B| 3,4  |
>> | C| 5|
>>
>> any tricks to achieve that? Scala Spark API is preferred. =D
>>
>> BR,
>> Todd Leo
>>
>>
>>
>>
>


Re: Spark DataFrame GroupBy into List

2015-10-13 Thread Rishitesh Mishra
Hi Liu,
I could not see any operator on DataFrame which will give the desired
result . DataFrame APIs as expected works on Row format and a fixed set of
operators on them.
However you can achive the desired result by accessing the internal RDD as
below..

val s = Seq(Test("A",1), Test("A",2),Test("B",1),Test("B",2))
val rdd = testSparkContext.parallelize(s)
val df = snc.createDataFrame(rdd)
val rdd1 = df.rdd.map(p => (Seq(p.getString(0)), Seq(p.getInt(1

val reduceF = (p: Seq[Int], q: Seq[Int]) => { Seq(p.head, q.head) }

val rdd3 = rdd1.reduceByKey(reduceF)
rdd3.foreach(r => println(r))



You can always reconvert the obtained RDD after tranformation and
reduce to a DataFrame.


Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://www.linkedin.com/profile/view?id=AAIAAAIFdkMB_v-nolCrFH6_pKf9oH6tZD8Qlgo=nav_responsive_tab_profile

On Tue, Oct 13, 2015 at 11:38 AM, SLiZn Liu  wrote:

> Hey Spark users,
>
> I'm trying to group by a dataframe, by appending occurrences into a list
> instead of count.
>
> Let's say we have a dataframe as shown below:
>
> | category | id |
> |  |:--:|
> | A| 1  |
> | A| 2  |
> | B| 3  |
> | B| 4  |
> | C| 5  |
>
> ideally, after some magic group by (reverse explode?):
>
> | category | id_list  |
> |  |  |
> | A| 1,2  |
> | B| 3,4  |
> | C| 5|
>
> any tricks to achieve that? Scala Spark API is preferred. =D
>
> BR,
> Todd Leo
>
>
>
>


--


Re: Spark DataFrame GroupBy into List

2015-10-13 Thread SLiZn Liu
Hi Rishitesh,

I did it by CombineByKey, but your solution is more clear and readable, at
least doesn't require 3 lambda functions to get confused with. Will
definitely try it out tomorrow, thanks. 

Plus, OutOfMemoryError keeps bothering me as I read a massive amount of
json files, whereas the yielded RDD by CombineByKey is rather small. Anyway
I'll file another mail to describe this.

BR,
Todd Leo

Rishitesh Mishra 于2015年10月13日 周二19:05写道:

> Hi Liu,
> I could not see any operator on DataFrame which will give the desired
> result . DataFrame APIs as expected works on Row format and a fixed set of
> operators on them.
> However you can achive the desired result by accessing the internal RDD as
> below..
>
> val s = Seq(Test("A",1), Test("A",2),Test("B",1),Test("B",2))
> val rdd = testSparkContext.parallelize(s)
> val df = snc.createDataFrame(rdd)
> val rdd1 = df.rdd.map(p => (Seq(p.getString(0)), Seq(p.getInt(1
>
> val reduceF = (p: Seq[Int], q: Seq[Int]) => { Seq(p.head, q.head) }
>
> val rdd3 = rdd1.reduceByKey(reduceF)
> rdd3.foreach(r => println(r))
>
>
>
> You can always reconvert the obtained RDD after tranformation and reduce to a 
> DataFrame.
>
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
>
> https://www.linkedin.com/profile/view?id=AAIAAAIFdkMB_v-nolCrFH6_pKf9oH6tZD8Qlgo=nav_responsive_tab_profile
>
> On Tue, Oct 13, 2015 at 11:38 AM, SLiZn Liu 
> wrote:
>
>> Hey Spark users,
>>
>> I'm trying to group by a dataframe, by appending occurrences into a list
>> instead of count.
>>
>> Let's say we have a dataframe as shown below:
>>
>> | category | id |
>> |  |:--:|
>> | A| 1  |
>> | A| 2  |
>> | B| 3  |
>> | B| 4  |
>> | C| 5  |
>>
>> ideally, after some magic group by (reverse explode?):
>>
>> | category | id_list  |
>> |  |  |
>> | A| 1,2  |
>> | B| 3,4  |
>> | C| 5|
>>
>> any tricks to achieve that? Scala Spark API is preferred. =D
>>
>> BR,
>> Todd Leo
>>
>>
>>
>>
>
>
> --
>
>
>


Re: Spark DataFrame GroupBy into List

2015-10-13 Thread Michael Armbrust
import org.apache.spark.sql.functions._

df.groupBy("category")
  .agg(callUDF("collect_set", df("id")).as("id_list"))

On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu  wrote:

> Hey Spark users,
>
> I'm trying to group by a dataframe, by appending occurrences into a list
> instead of count.
>
> Let's say we have a dataframe as shown below:
>
> | category | id |
> |  |:--:|
> | A| 1  |
> | A| 2  |
> | B| 3  |
> | B| 4  |
> | C| 5  |
>
> ideally, after some magic group by (reverse explode?):
>
> | category | id_list  |
> |  |  |
> | A| 1,2  |
> | B| 3,4  |
> | C| 5|
>
> any tricks to achieve that? Scala Spark API is preferred. =D
>
> BR,
> Todd Leo
>
>
>
>


Spark DataFrame GroupBy into List

2015-10-13 Thread SLiZn Liu
Hey Spark users,

I'm trying to group by a dataframe, by appending occurrences into a list
instead of count.

Let's say we have a dataframe as shown below:

| category | id |
|  |:--:|
| A| 1  |
| A| 2  |
| B| 3  |
| B| 4  |
| C| 5  |

ideally, after some magic group by (reverse explode?):

| category | id_list  |
|  |  |
| A| 1,2  |
| B| 3,4  |
| C| 5|

any tricks to achieve that? Scala Spark API is preferred. =D

BR,
Todd Leo


Re: DataFrame groupBy vs RDD groupBy

2015-05-23 Thread ayan guha
Hi Michael

This is great info. I am currently using repartitionandsort function to
achieve the same. Is this the recommended way till 1.3 or is there any
better way?
On 23 May 2015 07:38, Michael Armbrust mich...@databricks.com wrote:

 DataFrames have a lot more information about the data, so there is a whole
 class of optimizations that are possible there that we cannot do in RDDs.
 This is why we are focusing a lot of effort on this part of the project.
 In Spark 1.4 you can accomplish what you want using the new window function
 feature.  This can be done with SQL as you described or directly on a
 DataFrame:

 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.expressions._

 val df = Seq((a, 1), (b, 1), (c, 2), (d, 2)).toDF(x, y)
 df.select('x, 'y,
 rowNumber.over(Window.partitionBy(y).orderBy(x)).as(number)).show

 +-+-+--+
 |x|y|number|
 +-+-+--+
 |a|1| 1|
 |b|1| 2|
 |c|2| 1|
 |d|2| 2|
 +-+-+--+

 On Fri, May 22, 2015 at 3:35 AM, gtanguy g.tanguy.claravi...@gmail.com
 wrote:

 Hello everybody,

 I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part
 of my code using groupBy became really slow.

 *1/ *Why does the groupBy of rdd is really slow in comparison to the
 groupBy
 of dataFrame?

 // DataFrame : running in few seconds
 val result = table.groupBy(col1).count

 // RDD : taking hours with a lot of /spilling in-memory/
 val schemaOriginel = table.schema
 val result = table.rdd.groupBy { r =
  val rs = RowSchema(r, schemaOriginel)
  val col1 = rs.getValueByName(col1)
  col1
   }.map(l = (l._1,l._2.size) ).count()


 *2/* My goal is to groupBy on a key, then to order each group over a
 column
 and finally to add the row number in each group. I had this code running
 before changing to Spark 1.3 and it worked fine, but since I have changed
 to
 DataFrame it is really slow.

  val schemaOriginel = table.schema
  val result = table.rdd.groupBy { r =
 val rs = RowSchema(r, schemaOriginel)
 val col1 = rs.getValueByName(col1)
  col1
 }.flatMap {
  l =
l._2.toList
  .sortBy {
   u =
 val rs = RowSchema(u, schemaOriginel)
 val col1 = rs.getValueByName(col1)
 val col2 = rs.getValueByName(col2)
 (col1, col2)
 } .zipWithIndex
 }

 /I think the SQL equivalent of what I try to do : /

 SELECT a,
ROW_NUMBER() OVER (PARTITION BY a) AS num
 FROM table.


  I don't think I can do this with a GroupedData (result of df.groupby).
 Any
 ideas on how I can speed up this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: DataFrame groupBy vs RDD groupBy

2015-05-22 Thread Michael Armbrust
DataFrames have a lot more information about the data, so there is a whole
class of optimizations that are possible there that we cannot do in RDDs.
This is why we are focusing a lot of effort on this part of the project.
In Spark 1.4 you can accomplish what you want using the new window function
feature.  This can be done with SQL as you described or directly on a
DataFrame:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq((a, 1), (b, 1), (c, 2), (d, 2)).toDF(x, y)
df.select('x, 'y,
rowNumber.over(Window.partitionBy(y).orderBy(x)).as(number)).show

+-+-+--+
|x|y|number|
+-+-+--+
|a|1| 1|
|b|1| 2|
|c|2| 1|
|d|2| 2|
+-+-+--+

On Fri, May 22, 2015 at 3:35 AM, gtanguy g.tanguy.claravi...@gmail.com
wrote:

 Hello everybody,

 I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part
 of my code using groupBy became really slow.

 *1/ *Why does the groupBy of rdd is really slow in comparison to the
 groupBy
 of dataFrame?

 // DataFrame : running in few seconds
 val result = table.groupBy(col1).count

 // RDD : taking hours with a lot of /spilling in-memory/
 val schemaOriginel = table.schema
 val result = table.rdd.groupBy { r =
  val rs = RowSchema(r, schemaOriginel)
  val col1 = rs.getValueByName(col1)
  col1
   }.map(l = (l._1,l._2.size) ).count()


 *2/* My goal is to groupBy on a key, then to order each group over a column
 and finally to add the row number in each group. I had this code running
 before changing to Spark 1.3 and it worked fine, but since I have changed
 to
 DataFrame it is really slow.

  val schemaOriginel = table.schema
  val result = table.rdd.groupBy { r =
 val rs = RowSchema(r, schemaOriginel)
 val col1 = rs.getValueByName(col1)
  col1
 }.flatMap {
  l =
l._2.toList
  .sortBy {
   u =
 val rs = RowSchema(u, schemaOriginel)
 val col1 = rs.getValueByName(col1)
 val col2 = rs.getValueByName(col2)
 (col1, col2)
 } .zipWithIndex
 }

 /I think the SQL equivalent of what I try to do : /

 SELECT a,
ROW_NUMBER() OVER (PARTITION BY a) AS num
 FROM table.


  I don't think I can do this with a GroupedData (result of df.groupby). Any
 ideas on how I can speed up this?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame groupBy MapType

2015-04-07 Thread Justin Yip
Thanks Michael. Will submit a ticket.

Justin

On Mon, Apr 6, 2015 at 1:53 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'll add that I don't think there is a convenient way to do this in the
 Column API ATM, but would welcome a JIRA for adding it :)

 On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com
 wrote:

 In HiveQL, you should be able to express this as:

 SELECT ... FROM table GROUP BY m['SomeKey']

 On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I have a case class like this:

 case class A(
   m: Map[Long, Long],
   ...
 )

 and constructed a DataFrame from Seq[A].

 I would like to perform a groupBy on A.m(SomeKey). I can implement a
 UDF, create a new Column then invoke a groupBy on the new Column. But is it
 the idiomatic way of doing such operation?

 Can't find much info about operating MapType on Column in the doc.

 Thanks ahead!

 Justin






Re: DataFrame groupBy MapType

2015-04-06 Thread Michael Armbrust
In HiveQL, you should be able to express this as:

SELECT ... FROM table GROUP BY m['SomeKey']

On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I have a case class like this:

 case class A(
   m: Map[Long, Long],
   ...
 )

 and constructed a DataFrame from Seq[A].

 I would like to perform a groupBy on A.m(SomeKey). I can implement a
 UDF, create a new Column then invoke a groupBy on the new Column. But is it
 the idiomatic way of doing such operation?

 Can't find much info about operating MapType on Column in the doc.

 Thanks ahead!

 Justin



Re: DataFrame groupBy MapType

2015-04-06 Thread Michael Armbrust
I'll add that I don't think there is a convenient way to do this in the
Column API ATM, but would welcome a JIRA for adding it :)

On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com
wrote:

 In HiveQL, you should be able to express this as:

 SELECT ... FROM table GROUP BY m['SomeKey']

 On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I have a case class like this:

 case class A(
   m: Map[Long, Long],
   ...
 )

 and constructed a DataFrame from Seq[A].

 I would like to perform a groupBy on A.m(SomeKey). I can implement a
 UDF, create a new Column then invoke a groupBy on the new Column. But is it
 the idiomatic way of doing such operation?

 Can't find much info about operating MapType on Column in the doc.

 Thanks ahead!

 Justin





DataFrame groupBy MapType

2015-04-04 Thread Justin Yip
Hello,

I have a case class like this:

case class A(
  m: Map[Long, Long],
  ...
)

and constructed a DataFrame from Seq[A].

I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF,
create a new Column then invoke a groupBy on the new Column. But is it the
idiomatic way of doing such operation?

Can't find much info about operating MapType on Column in the doc.

Thanks ahead!

Justin


DataFrame GroupBy

2015-03-26 Thread gtanguy
Hello everybody,

I am trying to do a simple groupBy : 

*Code:*
val df  = hiveContext.sql(SELECT * FROM table1)
df .printSchema()
df .groupBy(customer_id).count().show(5)

*Stacktrace* :
root
 |-- customer_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- reco_material_id: string (nullable = true)
 |-- score: string (nullable = true)
 |-- category: string (nullable = true)
 |-- is_achat: string (nullable = true)

15/03/26 17:19:29 INFO HiveMetaStore: 0: get_table : db=default tbl=table1
15/03/26 17:19:29 INFO audit: ugi=spark ip=unknown-ip-addr  cmd=get_table :
db=default tbl=table1   
Exception in thread main java.util.NoSuchElementException: key not found:
customer_id#0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at
org.apache.spark.sql.catalyst.expressions.AttributeMap.default(AttributeMap.scala:29)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at
org.apache.spark.sql.catalyst.expressions.AttributeMap.apply(AttributeMap.scala:29)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:53)
at
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216)
at
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216)
at
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:1034)
at
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:212)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:152)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:290)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:1081)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:1079)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1085)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1085)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:758)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:809)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314)


Does anyone have an idea?

Regards,

Germain Tanguy.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-GroupBy-tp22242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Dataframe groupby custom functions (python)

2015-03-24 Thread jamborta
Hi all,

I have been trying out the new dataframe api in 1.3, which looks great by
the way.

I have found an example to define udfs and add them to select operations,
like this:

slen = F.udf(lambda s: len(s), IntegerType())
df.select(df.age, slen(df.name).alias('slen')).collect()

is it possible to to something similar with aggregates? Something like this:

gdf = df.groupBy(df.name)
gdf.agg(slen(df.age)).collect()

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-groupby-custom-functions-python-tp22205.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataframe groupby custom functions (python)

2015-03-24 Thread Michael Armbrust
The only UDAFs that we support today are those defined using the Hive UDAF
API.  Otherwise you'll have to drop into Spark operations.  I'd suggest
opening a JIRA.

On Tue, Mar 24, 2015 at 10:49 AM, jamborta jambo...@gmail.com wrote:

 Hi all,

 I have been trying out the new dataframe api in 1.3, which looks great by
 the way.

 I have found an example to define udfs and add them to select operations,
 like this:

 slen = F.udf(lambda s: len(s), IntegerType())
 df.select(df.age, slen(df.name).alias('slen')).collect()

 is it possible to to something similar with aggregates? Something like
 this:

 gdf = df.groupBy(df.name)
 gdf.agg(slen(df.age)).collect()

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-groupby-custom-functions-python-tp22205.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org