Is it possible to use aggregate by function available at rdd level to do similar stuff? On 19 Oct 2016 04:41, "Tobi Bosede" <ani.to...@gmail.com> wrote:
> Thanks Assaf. > > This is a lot more complicated than I was expecting...might end up using > collect if data fits in memory. I was also thinking about using the pivot > function in pandas, but that wouldn't work in parallel and so would be even > more inefficient than collect. > > On Tue, Oct 18, 2016 at 7:24 AM, Mendelson, Assaf <assaf.mendel...@rsa.com > > wrote: > >> A simple example: >> >> >> >> We have a scala file: >> >> >> >> *package *com.myorg.example >> >> *import *org.apache.spark.sql.{Row, SparkSession} >> *import *org.apache.spark.sql.expressions.{MutableAggregationBuffer, >> UserDefinedAggregateFunction} >> *import *org.apache.spark.sql.functions.{rand, sum} >> *import *org.apache.spark.sql.types.{DataType, DoubleType, StructField, >> StructType} >> >> >> *class *PerformSumUDAF() *extends *UserDefinedAggregateFunction { >> >> *def *inputSchema: StructType = >> *StructType*(*Array*(*StructField*(*"item"*, DoubleType))) >> >> *def *bufferSchema: StructType = >> *StructType*(*Array*(*StructField*(*"sum"*, DoubleType))) >> >> *def *dataType: DataType = DoubleType >> >> *def *deterministic: Boolean = >> >> *true def *initialize(buffer: MutableAggregationBuffer): Unit = { >> buffer(0) = 0.toDouble >> } >> >> *def *update(buffer: MutableAggregationBuffer, input: Row): Unit = { >> buffer(0) = buffer.getDouble(0) + input.getDouble(0) >> } >> >> *def *merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { >> buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0) >> } >> >> *def *evaluate(buffer: Row): Any = { >> buffer.getDouble(0) >> } >> } >> >> >> >> We place the file under myroot/src/main/scala/com/myor >> g/example/ExampleUDAF.scala >> >> Under myroot we create a pom file (sorry for not cleaning it up, it >> includes some stuff you probably not need like guava and avro) >> >> <*project*> >> <*groupId*>edu.berkeley</*groupId*> >> <*artifactId*>simple-project</*artifactId*> >> <*modelVersion*>4.0.0</*modelVersion*> >> <*name*>example packages</*name*> >> <*packaging*>jar</*packaging*> >> <*version*>1.0</*version*> >> <*properties*> >> <*project.build.sourceEncoding*>UTF-8</*project.build.sourceEncoding*> >> <*maven.compiler.source*>1.8</*maven.compiler.source*> >> <*maven.compiler.target*>1.8</*maven.compiler.target*> >> </*properties*> >> <*dependencies*> >> <*dependency*> >> <*groupId*>com.google.guava</*groupId*> >> <*artifactId*>guava</*artifactId*> >> <*version*>19.0</*version*> >> </*dependency*> >> <*dependency*> >> *<!-- Spark dependency --> *<*groupId*>org.apache.spark</*groupId*> >> <*artifactId*>spark-core_2.11</*artifactId*> >> <*version*>2.0.0</*version*> >> <*scope*>provided</*scope*> >> </*dependency*> >> <*dependency*> >> <*groupId*>org.postgresql</*groupId*> >> <*artifactId*>postgresql</*artifactId*> >> <*version*>9.4.1208</*version*> >> </*dependency*> >> <*dependency*> >> <*groupId*>com.databricks</*groupId*> >> <*artifactId*>spark-avro_2.11</*artifactId*> >> <*version*>3.0.0-preview2</*version*> >> </*dependency*> >> <*dependency*> >> <*groupId*>org.apache.spark</*groupId*> >> <*artifactId*>spark-sql_2.11</*artifactId*> >> <*version*>2.0.0</*version*> >> <*scope*>provided</*scope*> >> </*dependency*> >> <*dependency*> >> <*groupId*>org.scala-lang</*groupId*> >> <*artifactId*>scala-library</*artifactId*> >> <*version*>2.11.8</*version*> >> <*scope*>provided</*scope*> >> </*dependency*> >> >> </*dependencies*> >> <*build*> >> <*plugins*> >> <*plugin*> >> <*groupId*>org.apache.maven.plugins</*groupId*> >> <*artifactId*>maven-shade-plugin</*artifactId*> >> <*version*>2.4.3</*version*> >> <*executions*> >> <*execution*> >> <*phase*>package</*phase*> >> <*goals*> >> <*goal*>shade</*goal*> >> </*goals*> >> <*configuration*> >> <*relocations*> >> <*relocation*> >> <*pattern*>com.google.common</*pattern*> >> >> <*shadedPattern*>com.myorg.shaded.com.google.common</*shadedPattern*> >> </*relocation*> >> </*relocations*> >> >> <*finalName*>simple-project-1.0-jar-with-dependencies</*finalName*> >> </*configuration*> >> </*execution*> >> </*executions*> >> </*plugin*> >> <*plugin*> >> <*groupId*>org.scala-tools</*groupId*> >> <*artifactId*>maven-scala-plugin</*artifactId*> >> <*version*>2.15.2</*version*> >> <*executions*> >> <*execution*> >> <*goals*> >> <*goal*>compile</*goal*> >> </*goals*> >> </*execution*> >> </*executions*> >> </*plugin*> >> >> </*plugins*> >> </*build*> >> </*project*> >> >> >> >> Now you can compile the scala like so: mvn clean install (I assume you >> have maven installed). >> >> >> >> Now we want to call this from python (assuming spark is your spark >> session): >> >> # get a reference dataframe to do the example on: >> >> df = spark.range(20) >> >> >> >> # get the jvm pointer >> >> jvm = spark.sparkContext._gateway.jvm >> >> # import the class >> >> from py4j.java_gateway import java_import >> >> java_import(jvm, "com.myorg.example.PerformSumUDAF") >> >> >> >> #create an object from the class: >> >> udafObj = jvm.com.myorg.example.PerformSumUDAF() >> >> # define a python function to do the aggregation. >> >> from pyspark.sql.column import Column, _to_java_column, _to_seq >> >> def pythonudaf(c): >> >> # the _to_seq portion is because we need to convert this to a >> sequence of >> >> # input columns the way scala (java) expects them. The returned >> >> # value must then be converted to a pyspark Column >> >> return Column(udafObj.apply(_to_seq(spark.sparkContext, [c], >> _to_java_column))) >> >> >> >> # now lets use the function >> >> df.agg(pythonudaf(df.id)).show() >> >> >> >> Lastly when you run, make sure to use both –jars and --driver-class-path >> with the jar created from scala to make sure it is available in all nodes. >> >> >> >> >> >> >> >> *From:* Tobi Bosede [mailto:ani.to...@gmail.com] >> *Sent:* Monday, October 17, 2016 10:15 PM >> *To:* Mendelson, Assaf >> *Cc:* Holden Karau; user >> >> *Subject:* Re: Aggregate UDF (UDAF) in Python >> >> >> >> Thanks Assaf. Yes please provide an example of how to wrap code for >> python. I am leaning towards scala. >> >> >> >> On Mon, Oct 17, 2016 at 1:50 PM, Mendelson, Assaf < >> assaf.mendel...@rsa.com> wrote: >> >> A possible (bad) workaround would be to use the collect_list function. >> This will give you all the values in an array (list) and you can then >> create a UDF to do the aggregation yourself. This would be very slow and >> cost a lot of memory but it would work if your cluster can handle it. >> >> This is the only workaround I can think of, otherwise you will need to >> write the UDAF in java/scala and wrap it for python use. If you need an >> example on how to do so I can provide one. >> >> Assaf. >> >> >> >> *From:* Tobi Bosede [mailto:ani.to...@gmail.com] >> *Sent:* Sunday, October 16, 2016 7:49 PM >> *To:* Holden Karau >> *Cc:* user >> *Subject:* Re: Aggregate UDF (UDAF) in Python >> >> >> >> OK, I misread the year on the dev list. Can you comment on work arounds? >> (I.e. question about if scala/java are the only option.) >> >> >> >> On Sun, Oct 16, 2016 at 12:09 PM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >> The comment on the developer list is from earlier this week. I'm not sure >> why UDAF support hasn't made the hop to Python - while I work a fair amount >> on PySpark it's mostly in core & ML and not a lot with SQL so there could >> be good reasons I'm just not familiar with. We can try pinging Davies or >> Michael on the JIRA to see what their thoughts are. >> >> >> On Sunday, October 16, 2016, Tobi Bosede <ani.to...@gmail.com> wrote: >> >> Thanks for the info Holden. >> >> >> >> So it seems both the jira and the comment on the developer list are over >> a year old. More surprising, the jira has no assignee. Any particular >> reason for the lack of activity in this area? >> >> >> >> Is writing scala/java the only work around for this? I hear a lot of >> people say python is the gateway language to scala. It is because of issues >> like this that people use scala for Spark rather than python or eventually >> abandon python for scala. It just takes too long for features to get ported >> over from scala/java. >> >> >> >> >> >> On Sun, Oct 16, 2016 at 8:42 AM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >> I don't believe UDAFs are available in PySpark as this came up on the >> developer list while I was asking for what features people were missing in >> PySpark - see http://apache-spark-developers-list.1001551.n3.nabble. >> com/Python-Spark-Improvements-forked-from-Spark-Improvement- >> Proposals-td19422.html . The JIRA for tacking this issue is at >> https://issues.apache.org/jira/browse/SPARK-10915 >> >> >> >> On Sat, Oct 15, 2016 at 7:20 PM, Tobi Bosede <ani.to...@gmail.com> wrote: >> >> Hello, >> >> >> >> I am trying to use a UDF that calculates inter-quartile (IQR) range for >> pivot() and SQL in pyspark and got the error that my function wasn't an >> aggregate function in both scenarios. Does anyone know if UDAF >> functionality is available in python? If not, what can I do as a work >> around? >> >> >> >> Thanks, >> >> Tobi >> >> >> >> >> >> -- >> >> Cell : 425-233-8271 >> >> Twitter: https://twitter.com/holdenkarau >> >> >> >> >> >> -- >> >> Cell : 425-233-8271 >> >> Twitter: https://twitter.com/holdenkarau >> >> >> >> >> >> >> > >