[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861646#comment-16861646 ]
Hyukjin Kwon commented on SPARK-28006: -------------------------------------- The proposal itself looks making sense to me from a cursory look. One concern is that though I don't think Spark has such type of Window function. cc [~hvanhovell] as well. I suspect the output is the same as our grouped map Pandas UDF if I understood correctly? It might be helpful to show the output so that non-Python guys could understand how it works as well :-). > User-defined grouped transform pandas_udf for window operations > --------------------------------------------------------------- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark > Affects Versions: 2.4.3 > Reporter: Li Jin > Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > > Currently, in order to do this, user needs to use "grouped apply", for > example: > > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def zscore(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() / v.std() > return pdf > df.groupby('id').apply(zscore){code} > This approach has a few downside: > > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > @pandas_udf('double', GROUPED_XFORM) > def zscore(v): > return v - v.mean() / v.std() > w = Window.partitionBy('id') > df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org