[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863186#comment-16863186 ]
Li Jin edited comment on SPARK-28006 at 6/13/19 3:36 PM: --------------------------------------------------------- Hi [~viirya] good questions! >> Can we use pandas agg udfs as window function? pandas agg udfs as window function is supported. With both unbounded and bounded window. >> Because the proposed GROUPED_XFORM udf calculates output values for all rows >> in the group, looks like the proposed GROUPED_XFORM udf can only use window >> frame (UnboundedPreceding, UnboundedFollowing) This is correct. It is really using unbounded window as groups here (because there is no groupby transform API in Spark sql). was (Author: icexelloss): Hi [~viirya] good questions: >> Can we use pandas agg udfs as window function? pandas agg udfs as window function is supported. With both unbounded and bounded window. >> Because the proposed GROUPED_XFORM udf calculates output values for all rows >> in the group, looks like the proposed GROUPED_XFORM udf can only use window >> frame (UnboundedPreceding, UnboundedFollowing) This is correct. It is really using unbounded window as groups here (because there is no groupby transform API in Spark sql). > User-defined grouped transform pandas_udf for window operations > --------------------------------------------------------------- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark > Affects Versions: 2.4.3 > Reporter: Li Jin > Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > Currently, in order to do this, user needs to use "grouped apply", for > example: > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def subtract_mean(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() > return pdf > df.groupby('id').apply(subtract_mean) > # +---+----+ > # | id| v| > # +---+----+ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---+----+{code} > This approach has a few downside: > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf('double', GROUPED_XFORM) > def subtract_mean(v): > return v - v.mean() > w = Window.partitionBy('id') > df = df.withColumn('v', subtract_mean(df['v']).over(w)) > # +---+----+ > # | id| v| > # +---+----+ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---+----+{code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org