Data Skew in Dataframe Groupby - Any suggestions?
Hi, I am working on requirement where I need to perform groupby on set of data and find the max value on that group. GroupBy on dataframe is resulting in skewness and job is running for quite a long time (actually more time than in Hive and Impala for one day worth of data). Any suggestions on how to overcome this? dataframe.groupBy(Constants.Datapoint.Vin,Constants.Datapoint.Utctime,Constants.Datapoint.ProviderDesc,Constants.Datapoint.Latitude,Constants.Datapoint.Longitude) *Note: *I have added colleace and persited data into memory and disk too still no improvement Thanks, Asmath.
RE: forcing dataframe groupby partitioning
Could you explain why this would work? Assaf. From: Haviv, Daniel [mailto:dha...@amazon.com] Sent: Sunday, January 29, 2017 7:09 PM To: Mendelson, Assaf Cc: user@spark.apache.org Subject: Re: forcing dataframe groupby partitioning If there's no built in local groupBy, You could do something like that: df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg Thank you. Daniel On 29 Jan 2017, at 18:33, Mendelson, Assaf <assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote: Hi, Consider the following example: df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg) The default way spark would behave would be to shuffle according to a combination of C1 and C2 and then shuffle again by C1 only. This behavior makes sense when one uses C2 to salt C1 for skewed data, however, when this is part of the logic the cost of doing two shuffles instead of one is not insignificant (even worse when multiple iterations are done). Is there a way to force the first groupby to shuffle the data so that the second groupby would occur within the partition? Thanks, Assaf.
Re: forcing dataframe groupby partitioning
If there's no built in local groupBy, You could do something like that: df.groupby(C1,C2).agg(...).flatmap(x=>x.groupBy(C1)).agg Thank you. Daniel On 29 Jan 2017, at 18:33, Mendelson, Assaf> wrote: Hi, Consider the following example: df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg) The default way spark would behave would be to shuffle according to a combination of C1 and C2 and then shuffle again by C1 only. This behavior makes sense when one uses C2 to salt C1 for skewed data, however, when this is part of the logic the cost of doing two shuffles instead of one is not insignificant (even worse when multiple iterations are done). Is there a way to force the first groupby to shuffle the data so that the second groupby would occur within the partition? Thanks, Assaf.
forcing dataframe groupby partitioning
Hi, Consider the following example: df.groupby(C1,C2).agg(some agg).groupby(C1).agg(some more agg) The default way spark would behave would be to shuffle according to a combination of C1 and C2 and then shuffle again by C1 only. This behavior makes sense when one uses C2 to salt C1 for skewed data, however, when this is part of the logic the cost of doing two shuffles instead of one is not insignificant (even worse when multiple iterations are done). Is there a way to force the first groupby to shuffle the data so that the second groupby would occur within the partition? Thanks, Assaf.
How to change Spark DataFrame groupby("col1",..,"coln") into reduceByKey()?
Hi I have Spark job which does group by and I cant avoid it because of my use case. I have large dataset around 1 TB which I need to process/update in DataFrame. Now my jobs shuffles huge data and slows things because of shuffling and groupby. One reason I see is my data is skew some of my group by keys are empty. How do I avoid empty group by keys in DataFrame? Does DataFrame avoid empty group by key? I have around 8 keys on which I do group by. sourceFrame.select("blabla").groupby("col1","col2","col3",..."col8").agg("bla bla"); How do I change above code into using reduceByKey() can we apply aggregation on reduceByKey()? Please guide. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-change-Spark-DataFrame-groupby-col1-coln-into-reduceByKey-tp26998.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pyspark - how to use UDFs with dataframe groupby
short answer: PySpark does not support UDAF (user defined aggregate function) for now. On Tue, Feb 9, 2016 at 11:44 PM, Viktor ARDELEAN <viktor0...@gmail.com> wrote: > Hello, > > I am using following transformations on RDD: > > rddAgg = df.map(lambda l: (Row(a = l.a, b= l.b, c = l.c), l))\ >.aggregateByKey([], lambda accumulatorList, value: accumulatorList > + [value], lambda list1, list2: [list1] + [list2]) > > I want to use the dataframe groupBy + agg transformation instead of map + > aggregateByKey because as far as I know dataframe transformations are faster > than RDD transformations. > > I just can't figure out how to use custom aggregate functions with agg. > > *First step is clear:* > > groupedData = df.groupBy("a","b","c") > > *Second step is not very clear to me:* > > dfAgg = groupedData.agg( a list and merges it?>) > > The agg documentations says the following: > agg(**exprs*) > <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.GroupedData.agg> > > Compute aggregates and returns the result as a DataFrame > <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.DataFrame> > . > > The available aggregate functions are avg, max, min, sum, count. > > If exprs is a single dict mapping from string to string, then the key is > the column to perform aggregation on, and the value is the aggregate > function. > > Alternatively, exprs can also be a list of aggregate Column > <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column> > expressions. > Parameters: *exprs* – a dict mapping from column name (string) to > aggregate functions (string), or a list of Column > <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column> > . > > Thanks for help! > -- > Viktor > > *P* Don't print this email, unless it's really necessary. Take care of > the environment. >
Pyspark - how to use UDFs with dataframe groupby
Hello, I am using following transformations on RDD: rddAgg = df.map(lambda l: (Row(a = l.a, b= l.b, c = l.c), l))\ .aggregateByKey([], lambda accumulatorList, value: accumulatorList + [value], lambda list1, list2: [list1] + [list2]) I want to use the dataframe groupBy + agg transformation instead of map + aggregateByKey because as far as I know dataframe transformations are faster than RDD transformations. I just can't figure out how to use custom aggregate functions with agg. *First step is clear:* groupedData = df.groupBy("a","b","c") *Second step is not very clear to me:* dfAgg = groupedData.agg() The agg documentations says the following: agg(**exprs*) <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.GroupedData.agg> Compute aggregates and returns the result as a DataFrame <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.DataFrame> . The available aggregate functions are avg, max, min, sum, count. If exprs is a single dict mapping from string to string, then the key is the column to perform aggregation on, and the value is the aggregate function. Alternatively, exprs can also be a list of aggregate Column <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column> expressions. Parameters: *exprs* – a dict mapping from column name (string) to aggregate functions (string), or a list of Column <https://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html?highlight=min#pyspark.sql.Column> . Thanks for help! -- Viktor *P* Don't print this email, unless it's really necessary. Take care of the environment.
Re: How to ignore case in dataframe groupby?
Drop the original column and rename the new column See df.drop & df.withcolimnrenamed Eran On Wed, 30 Dec 2015 at 19:08 raja kbvwrote: > Solutions from Eran & Yanbo are working well. Thank you. > > @Eran, > > Your solution worked with a small change. > DF.withColumn("upper-code",upper(df("countrycode"))). > > This creates a new column "upper-code". Is there a way to update the > column or create a new df with update column? > > Thanks, > Raja > > On Thursday, 24 December 2015 6:17 PM, Eran Witkon > wrote: > > > Use DF.withColumn("upper-code",df("countrycode).toUpper)) > or just run a map function that does the same > > On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja > wrote: > > Hi, > Values in a dataframe column named countrycode are in different cases. Eg: > (US, us). groupBy & count gives two rows but the requirement is to ignore > case for this operation. > 1) Is there a way to ignore case in groupBy? Or > 2) Is there a way to update the dataframe column countrycode to uppercase? > > Thanks in advance. > > Regards, > Raja > > > >
Re: How to ignore case in dataframe groupby?
Solutions from Eran & Yanbo are working well. Thank you. @Eran, Your solution worked with a small change. DF.withColumn("upper-code",upper(df("countrycode"))). This creates a new column "upper-code". Is there a way to update the column or create a new df with update column? Thanks,Raja On Thursday, 24 December 2015 6:17 PM, Eran Witkonwrote: Use DF.withColumn("upper-code",df("countrycode).toUpper))or just run a map function that does the same On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja wrote: Hi, Values in a dataframe column named countrycode are in different cases. Eg: (US, us). groupBy & count gives two rows but the requirement is to ignore case for this operation. 1) Is there a way to ignore case in groupBy? Or 2) Is there a way to update the dataframe column countrycode to uppercase? Thanks in advance. Regards, Raja
Re: How to ignore case in dataframe groupby?
You can use DF.groupBy(upper(col("a"))).agg(sum(col("b"))). DataFrame provide function "upper" to update column to uppercase. 2015-12-24 20:47 GMT+08:00 Eran Witkon: > Use DF.withColumn("upper-code",df("countrycode).toUpper)) > or just run a map function that does the same > > On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja > wrote: > >> Hi, >> Values in a dataframe column named countrycode are in different cases. >> Eg: (US, us). groupBy & count gives two rows but the requirement is to >> ignore case for this operation. >> 1) Is there a way to ignore case in groupBy? Or >> 2) Is there a way to update the dataframe column countrycode to uppercase? >> >> Thanks in advance. >> >> Regards, >> Raja >> >
How to ignore case in dataframe groupby?
Hi, Values in a dataframe column named countrycode are in different cases. Eg: (US, us). groupBy & count gives two rows but the requirement is to ignore case for this operation. 1) Is there a way to ignore case in groupBy? Or 2) Is there a way to update the dataframe column countrycode to uppercase? Thanks in advance. Regards, Raja
Re: How to ignore case in dataframe groupby?
Use DF.withColumn("upper-code",df("countrycode).toUpper)) or just run a map function that does the same On Thu, Dec 24, 2015 at 2:05 PM Bharathi Rajawrote: > Hi, > Values in a dataframe column named countrycode are in different cases. Eg: > (US, us). groupBy & count gives two rows but the requirement is to ignore > case for this operation. > 1) Is there a way to ignore case in groupBy? Or > 2) Is there a way to update the dataframe column countrycode to uppercase? > > Thanks in advance. > > Regards, > Raja >
Re: Spark DataFrame GroupBy into List
collect_set and collect_list are built-in User Defined functions see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF On 14 October 2015 at 03:45, SLiZn Liuwrote: > Hi Michael, > > Can you be more specific on `collect_set`? Is it a built-in function or, > if it is an UDF, how it is defined? > > BR, > Todd Leo > > On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust > wrote: > >> import org.apache.spark.sql.functions._ >> >> df.groupBy("category") >> .agg(callUDF("collect_set", df("id")).as("id_list")) >> >> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu >> wrote: >> >>> Hey Spark users, >>> >>> I'm trying to group by a dataframe, by appending occurrences into a list >>> instead of count. >>> >>> Let's say we have a dataframe as shown below: >>> >>> | category | id | >>> | |:--:| >>> | A| 1 | >>> | A| 2 | >>> | B| 3 | >>> | B| 4 | >>> | C| 5 | >>> >>> ideally, after some magic group by (reverse explode?): >>> >>> | category | id_list | >>> | | | >>> | A| 1,2 | >>> | B| 3,4 | >>> | C| 5| >>> >>> any tricks to achieve that? Scala Spark API is preferred. =D >>> >>> BR, >>> Todd Leo >>> >>> >>> >>> >>
RE: Spark DataFrame GroupBy into List
My guess is the same as UDAF of (collect_set) in Hive. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) Yong From: sliznmail...@gmail.com Date: Wed, 14 Oct 2015 02:45:48 + Subject: Re: Spark DataFrame GroupBy into List To: mich...@databricks.com CC: user@spark.apache.org Hi Michael, Can you be more specific on `collect_set`? Is it a built-in function or, if it is an UDF, how it is defined? BR,Todd Leo On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com> wrote: import org.apache.spark.sql.functions._ df.groupBy("category") .agg(callUDF("collect_set", df("id")).as("id_list")) On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com> wrote: Hey Spark users, I'm trying to group by a dataframe, by appending occurrences into a list instead of count. Let's say we have a dataframe as shown below:| category | id | | |:--:| | A| 1 | | A| 2 | | B| 3 | | B| 4 | | C| 5 | ideally, after some magic group by (reverse explode?):| category | id_list | | | | | A| 1,2 | | B| 3,4 | | C| 5| any tricks to achieve that? Scala Spark API is preferred. =D BR,Todd Leo
Re: Spark DataFrame GroupBy into List
Thats correct. It is a Hive UDAF. On Wed, Oct 14, 2015 at 6:45 AM, java8964 <java8...@hotmail.com> wrote: > My guess is the same as UDAF of (collect_set) in Hive. > > > https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) > > Yong > > -- > From: sliznmail...@gmail.com > Date: Wed, 14 Oct 2015 02:45:48 + > Subject: Re: Spark DataFrame GroupBy into List > To: mich...@databricks.com > CC: user@spark.apache.org > > > Hi Michael, > > Can you be more specific on `collect_set`? Is it a built-in function or, > if it is an UDF, how it is defined? > > BR, > Todd Leo > > On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com> > wrote: > > import org.apache.spark.sql.functions._ > > df.groupBy("category") > .agg(callUDF("collect_set", df("id")).as("id_list")) > > On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com> > wrote: > > Hey Spark users, > > I'm trying to group by a dataframe, by appending occurrences into a list > instead of count. > > Let's say we have a dataframe as shown below: > > | category | id | > | |:--:| > | A| 1 | > | A| 2 | > | B| 3 | > | B| 4 | > | C| 5 | > > ideally, after some magic group by (reverse explode?): > > | category | id_list | > | | | > | A| 1,2 | > | B| 3,4 | > | C| 5| > > any tricks to achieve that? Scala Spark API is preferred. =D > > BR, > Todd Leo > > > > >
Re: Spark DataFrame GroupBy into List
Thanks, Michael and java8964! Does Hive Context also provides udf for combining existing lists, into flattened(not nested) list? (list->list of lists -[flatten]->list). On Thu, Oct 15, 2015 at 1:16 AM Michael Armbrust <mich...@databricks.com> wrote: > Thats correct. It is a Hive UDAF. > > On Wed, Oct 14, 2015 at 6:45 AM, java8964 <java8...@hotmail.com> wrote: > >> My guess is the same as UDAF of (collect_set) in Hive. >> >> >> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) >> >> Yong >> >> -- >> From: sliznmail...@gmail.com >> Date: Wed, 14 Oct 2015 02:45:48 + >> Subject: Re: Spark DataFrame GroupBy into List >> To: mich...@databricks.com >> CC: user@spark.apache.org >> >> >> Hi Michael, >> >> Can you be more specific on `collect_set`? Is it a built-in function or, >> if it is an UDF, how it is defined? >> >> BR, >> Todd Leo >> >> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com> >> wrote: >> >> import org.apache.spark.sql.functions._ >> >> df.groupBy("category") >> .agg(callUDF("collect_set", df("id")).as("id_list")) >> >> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com> >> wrote: >> >> Hey Spark users, >> >> I'm trying to group by a dataframe, by appending occurrences into a list >> instead of count. >> >> Let's say we have a dataframe as shown below: >> >> | category | id | >> | |:--:| >> | A| 1 | >> | A| 2 | >> | B| 3 | >> | B| 4 | >> | C| 5 | >> >> ideally, after some magic group by (reverse explode?): >> >> | category | id_list | >> | | | >> | A| 1,2 | >> | B| 3,4 | >> | C| 5| >> >> any tricks to achieve that? Scala Spark API is preferred. =D >> >> BR, >> Todd Leo >> >> >> >> >> >
Re: Spark DataFrame GroupBy into List
Hi Michael, Can you be more specific on `collect_set`? Is it a built-in function or, if it is an UDF, how it is defined? BR, Todd Leo On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrustwrote: > import org.apache.spark.sql.functions._ > > df.groupBy("category") > .agg(callUDF("collect_set", df("id")).as("id_list")) > > On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu > wrote: > >> Hey Spark users, >> >> I'm trying to group by a dataframe, by appending occurrences into a list >> instead of count. >> >> Let's say we have a dataframe as shown below: >> >> | category | id | >> | |:--:| >> | A| 1 | >> | A| 2 | >> | B| 3 | >> | B| 4 | >> | C| 5 | >> >> ideally, after some magic group by (reverse explode?): >> >> | category | id_list | >> | | | >> | A| 1,2 | >> | B| 3,4 | >> | C| 5| >> >> any tricks to achieve that? Scala Spark API is preferred. =D >> >> BR, >> Todd Leo >> >> >> >> >
Re: Spark DataFrame GroupBy into List
Hi Liu, I could not see any operator on DataFrame which will give the desired result . DataFrame APIs as expected works on Row format and a fixed set of operators on them. However you can achive the desired result by accessing the internal RDD as below.. val s = Seq(Test("A",1), Test("A",2),Test("B",1),Test("B",2)) val rdd = testSparkContext.parallelize(s) val df = snc.createDataFrame(rdd) val rdd1 = df.rdd.map(p => (Seq(p.getString(0)), Seq(p.getInt(1 val reduceF = (p: Seq[Int], q: Seq[Int]) => { Seq(p.head, q.head) } val rdd3 = rdd1.reduceByKey(reduceF) rdd3.foreach(r => println(r)) You can always reconvert the obtained RDD after tranformation and reduce to a DataFrame. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://www.linkedin.com/profile/view?id=AAIAAAIFdkMB_v-nolCrFH6_pKf9oH6tZD8Qlgo=nav_responsive_tab_profile On Tue, Oct 13, 2015 at 11:38 AM, SLiZn Liuwrote: > Hey Spark users, > > I'm trying to group by a dataframe, by appending occurrences into a list > instead of count. > > Let's say we have a dataframe as shown below: > > | category | id | > | |:--:| > | A| 1 | > | A| 2 | > | B| 3 | > | B| 4 | > | C| 5 | > > ideally, after some magic group by (reverse explode?): > > | category | id_list | > | | | > | A| 1,2 | > | B| 3,4 | > | C| 5| > > any tricks to achieve that? Scala Spark API is preferred. =D > > BR, > Todd Leo > > > > --
Re: Spark DataFrame GroupBy into List
Hi Rishitesh, I did it by CombineByKey, but your solution is more clear and readable, at least doesn't require 3 lambda functions to get confused with. Will definitely try it out tomorrow, thanks. Plus, OutOfMemoryError keeps bothering me as I read a massive amount of json files, whereas the yielded RDD by CombineByKey is rather small. Anyway I'll file another mail to describe this. BR, Todd Leo Rishitesh Mishra于2015年10月13日 周二19:05写道: > Hi Liu, > I could not see any operator on DataFrame which will give the desired > result . DataFrame APIs as expected works on Row format and a fixed set of > operators on them. > However you can achive the desired result by accessing the internal RDD as > below.. > > val s = Seq(Test("A",1), Test("A",2),Test("B",1),Test("B",2)) > val rdd = testSparkContext.parallelize(s) > val df = snc.createDataFrame(rdd) > val rdd1 = df.rdd.map(p => (Seq(p.getString(0)), Seq(p.getInt(1 > > val reduceF = (p: Seq[Int], q: Seq[Int]) => { Seq(p.head, q.head) } > > val rdd3 = rdd1.reduceByKey(reduceF) > rdd3.foreach(r => println(r)) > > > > You can always reconvert the obtained RDD after tranformation and reduce to a > DataFrame. > > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > > https://www.linkedin.com/profile/view?id=AAIAAAIFdkMB_v-nolCrFH6_pKf9oH6tZD8Qlgo=nav_responsive_tab_profile > > On Tue, Oct 13, 2015 at 11:38 AM, SLiZn Liu > wrote: > >> Hey Spark users, >> >> I'm trying to group by a dataframe, by appending occurrences into a list >> instead of count. >> >> Let's say we have a dataframe as shown below: >> >> | category | id | >> | |:--:| >> | A| 1 | >> | A| 2 | >> | B| 3 | >> | B| 4 | >> | C| 5 | >> >> ideally, after some magic group by (reverse explode?): >> >> | category | id_list | >> | | | >> | A| 1,2 | >> | B| 3,4 | >> | C| 5| >> >> any tricks to achieve that? Scala Spark API is preferred. =D >> >> BR, >> Todd Leo >> >> >> >> > > > -- > > >
Re: Spark DataFrame GroupBy into List
import org.apache.spark.sql.functions._ df.groupBy("category") .agg(callUDF("collect_set", df("id")).as("id_list")) On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liuwrote: > Hey Spark users, > > I'm trying to group by a dataframe, by appending occurrences into a list > instead of count. > > Let's say we have a dataframe as shown below: > > | category | id | > | |:--:| > | A| 1 | > | A| 2 | > | B| 3 | > | B| 4 | > | C| 5 | > > ideally, after some magic group by (reverse explode?): > > | category | id_list | > | | | > | A| 1,2 | > | B| 3,4 | > | C| 5| > > any tricks to achieve that? Scala Spark API is preferred. =D > > BR, > Todd Leo > > > >
Spark DataFrame GroupBy into List
Hey Spark users, I'm trying to group by a dataframe, by appending occurrences into a list instead of count. Let's say we have a dataframe as shown below: | category | id | | |:--:| | A| 1 | | A| 2 | | B| 3 | | B| 4 | | C| 5 | ideally, after some magic group by (reverse explode?): | category | id_list | | | | | A| 1,2 | | B| 3,4 | | C| 5| any tricks to achieve that? Scala Spark API is preferred. =D BR, Todd Leo
Re: DataFrame groupBy vs RDD groupBy
Hi Michael This is great info. I am currently using repartitionandsort function to achieve the same. Is this the recommended way till 1.3 or is there any better way? On 23 May 2015 07:38, Michael Armbrust mich...@databricks.com wrote: DataFrames have a lot more information about the data, so there is a whole class of optimizations that are possible there that we cannot do in RDDs. This is why we are focusing a lot of effort on this part of the project. In Spark 1.4 you can accomplish what you want using the new window function feature. This can be done with SQL as you described or directly on a DataFrame: import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ val df = Seq((a, 1), (b, 1), (c, 2), (d, 2)).toDF(x, y) df.select('x, 'y, rowNumber.over(Window.partitionBy(y).orderBy(x)).as(number)).show +-+-+--+ |x|y|number| +-+-+--+ |a|1| 1| |b|1| 2| |c|2| 1| |d|2| 2| +-+-+--+ On Fri, May 22, 2015 at 3:35 AM, gtanguy g.tanguy.claravi...@gmail.com wrote: Hello everybody, I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part of my code using groupBy became really slow. *1/ *Why does the groupBy of rdd is really slow in comparison to the groupBy of dataFrame? // DataFrame : running in few seconds val result = table.groupBy(col1).count // RDD : taking hours with a lot of /spilling in-memory/ val schemaOriginel = table.schema val result = table.rdd.groupBy { r = val rs = RowSchema(r, schemaOriginel) val col1 = rs.getValueByName(col1) col1 }.map(l = (l._1,l._2.size) ).count() *2/* My goal is to groupBy on a key, then to order each group over a column and finally to add the row number in each group. I had this code running before changing to Spark 1.3 and it worked fine, but since I have changed to DataFrame it is really slow. val schemaOriginel = table.schema val result = table.rdd.groupBy { r = val rs = RowSchema(r, schemaOriginel) val col1 = rs.getValueByName(col1) col1 }.flatMap { l = l._2.toList .sortBy { u = val rs = RowSchema(u, schemaOriginel) val col1 = rs.getValueByName(col1) val col2 = rs.getValueByName(col2) (col1, col2) } .zipWithIndex } /I think the SQL equivalent of what I try to do : / SELECT a, ROW_NUMBER() OVER (PARTITION BY a) AS num FROM table. I don't think I can do this with a GroupedData (result of df.groupby). Any ideas on how I can speed up this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame groupBy vs RDD groupBy
DataFrames have a lot more information about the data, so there is a whole class of optimizations that are possible there that we cannot do in RDDs. This is why we are focusing a lot of effort on this part of the project. In Spark 1.4 you can accomplish what you want using the new window function feature. This can be done with SQL as you described or directly on a DataFrame: import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ val df = Seq((a, 1), (b, 1), (c, 2), (d, 2)).toDF(x, y) df.select('x, 'y, rowNumber.over(Window.partitionBy(y).orderBy(x)).as(number)).show +-+-+--+ |x|y|number| +-+-+--+ |a|1| 1| |b|1| 2| |c|2| 1| |d|2| 2| +-+-+--+ On Fri, May 22, 2015 at 3:35 AM, gtanguy g.tanguy.claravi...@gmail.com wrote: Hello everybody, I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part of my code using groupBy became really slow. *1/ *Why does the groupBy of rdd is really slow in comparison to the groupBy of dataFrame? // DataFrame : running in few seconds val result = table.groupBy(col1).count // RDD : taking hours with a lot of /spilling in-memory/ val schemaOriginel = table.schema val result = table.rdd.groupBy { r = val rs = RowSchema(r, schemaOriginel) val col1 = rs.getValueByName(col1) col1 }.map(l = (l._1,l._2.size) ).count() *2/* My goal is to groupBy on a key, then to order each group over a column and finally to add the row number in each group. I had this code running before changing to Spark 1.3 and it worked fine, but since I have changed to DataFrame it is really slow. val schemaOriginel = table.schema val result = table.rdd.groupBy { r = val rs = RowSchema(r, schemaOriginel) val col1 = rs.getValueByName(col1) col1 }.flatMap { l = l._2.toList .sortBy { u = val rs = RowSchema(u, schemaOriginel) val col1 = rs.getValueByName(col1) val col2 = rs.getValueByName(col2) (col1, col2) } .zipWithIndex } /I think the SQL equivalent of what I try to do : / SELECT a, ROW_NUMBER() OVER (PARTITION BY a) AS num FROM table. I don't think I can do this with a GroupedData (result of df.groupby). Any ideas on how I can speed up this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame groupBy MapType
Thanks Michael. Will submit a ticket. Justin On Mon, Apr 6, 2015 at 1:53 PM, Michael Armbrust mich...@databricks.com wrote: I'll add that I don't think there is a convenient way to do this in the Column API ATM, but would welcome a JIRA for adding it :) On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com wrote: In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
Re: DataFrame groupBy MapType
In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
Re: DataFrame groupBy MapType
I'll add that I don't think there is a convenient way to do this in the Column API ATM, but would welcome a JIRA for adding it :) On Mon, Apr 6, 2015 at 1:45 PM, Michael Armbrust mich...@databricks.com wrote: In HiveQL, you should be able to express this as: SELECT ... FROM table GROUP BY m['SomeKey'] On Sat, Apr 4, 2015 at 5:25 PM, Justin Yip yipjus...@prediction.io wrote: Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
DataFrame groupBy MapType
Hello, I have a case class like this: case class A( m: Map[Long, Long], ... ) and constructed a DataFrame from Seq[A]. I would like to perform a groupBy on A.m(SomeKey). I can implement a UDF, create a new Column then invoke a groupBy on the new Column. But is it the idiomatic way of doing such operation? Can't find much info about operating MapType on Column in the doc. Thanks ahead! Justin
DataFrame GroupBy
Hello everybody, I am trying to do a simple groupBy : *Code:* val df = hiveContext.sql(SELECT * FROM table1) df .printSchema() df .groupBy(customer_id).count().show(5) *Stacktrace* : root |-- customer_id: string (nullable = true) |-- rank: string (nullable = true) |-- reco_material_id: string (nullable = true) |-- score: string (nullable = true) |-- category: string (nullable = true) |-- is_achat: string (nullable = true) 15/03/26 17:19:29 INFO HiveMetaStore: 0: get_table : db=default tbl=table1 15/03/26 17:19:29 INFO audit: ugi=spark ip=unknown-ip-addr cmd=get_table : db=default tbl=table1 Exception in thread main java.util.NoSuchElementException: key not found: customer_id#0 at scala.collection.MapLike$class.default(MapLike.scala:228) at org.apache.spark.sql.catalyst.expressions.AttributeMap.default(AttributeMap.scala:29) at scala.collection.MapLike$class.apply(MapLike.scala:141) at org.apache.spark.sql.catalyst.expressions.AttributeMap.apply(AttributeMap.scala:29) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:53) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$15.apply(HiveStrategies.scala:216) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:1034) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:212) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:152) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54) at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:290) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:1081) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:1079) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:1085) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:1085) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:758) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:809) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314) Does anyone have an idea? Regards, Germain Tanguy. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-GroupBy-tp22242.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dataframe groupby custom functions (python)
Hi all, I have been trying out the new dataframe api in 1.3, which looks great by the way. I have found an example to define udfs and add them to select operations, like this: slen = F.udf(lambda s: len(s), IntegerType()) df.select(df.age, slen(df.name).alias('slen')).collect() is it possible to to something similar with aggregates? Something like this: gdf = df.groupBy(df.name) gdf.agg(slen(df.age)).collect() thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-groupby-custom-functions-python-tp22205.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dataframe groupby custom functions (python)
The only UDAFs that we support today are those defined using the Hive UDAF API. Otherwise you'll have to drop into Spark operations. I'd suggest opening a JIRA. On Tue, Mar 24, 2015 at 10:49 AM, jamborta jambo...@gmail.com wrote: Hi all, I have been trying out the new dataframe api in 1.3, which looks great by the way. I have found an example to define udfs and add them to select operations, like this: slen = F.udf(lambda s: len(s), IntegerType()) df.select(df.age, slen(df.name).alias('slen')).collect() is it possible to to something similar with aggregates? Something like this: gdf = df.groupBy(df.name) gdf.agg(slen(df.age)).collect() thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-groupby-custom-functions-python-tp22205.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org