[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861559#comment-16861559 ]
Li Jin commented on SPARK-28006: -------------------------------- cc [~hyukjin.kwon] [~LI,Xiao] [~ueshin] [~bryanc] I think code wise this is pretty simple but since this is adding a new pandas udf type I'd like to get some feedback on this. > User-defined grouped transform pandas_udf for window operations > --------------------------------------------------------------- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark > Affects Versions: 2.4.3 > Reporter: Li Jin > Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > > Currently, in order to do this, user needs to use "grouped apply", for > example: > > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def zscore(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() / v.std() > return pdf > df.groupby('id').apply(zscore){code} > This approach has a few downside: > > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > @pandas_udf('double', GROUPED_XFORM) > def zscore(v): > return v - v.mean() / v.std() > w = Window.partitionBy('id') > df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org