[ 
https://issues.apache.org/jira/browse/SPARK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329177#comment-16329177
 ] 

Apache Spark commented on SPARK-23011:
--------------------------------------

User 'icexelloss' has created a pull request for this issue:
https://github.com/apache/spark/pull/20295

> Support alternative function form with group aggregate pandas UDF
> -----------------------------------------------------------------
>
>                 Key: SPARK-23011
>                 URL: https://issues.apache.org/jira/browse/SPARK-23011
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.0
>            Reporter: Li Jin
>            Priority: Major
>
> The current semantics of groupby apply is that the output schema of groupby 
> apply is the same as the output schema of the UDF. Because grouping column is 
> usually useful to users, users often need to output grouping columns in the 
> UDF. To further explain, consider the following example:
> {code:java}
> import statsmodels.api as sm
> # df has four columns: id, y, x1, x2
> group_column = 'id'
> y_column = 'y'
> x_columns = ['x1', 'x2']
> schema = df.select(group_column, *x_columns).schema
> @pandas_udf(schema, PandasUDFType.GROUP_MAP)
> # Input/output are both a pandas.DataFrame
> def ols(pdf):
>     group_key = pdf[group_column].iloc[0]
>     y = pdf[y_column]
>     X = pdf[x_columns]
>       X = sm.add_constant(X)
>     model = sm.OLS(y, X).fit()
>     return pd.DataFrame([[group_key] + [model.params[i] for i in   
> x_columns]], columns=[group_column] + x_columns)
> beta = df.groupby(group_column).apply(ols)
> {code}
> Although the UDF (linear regression) has nothing to do with the grouping 
> column, the user needs to deal with grouping column in the UDF. In other 
> words, the UDF is tightly coupled with the grouping column.
>  
> With discussion in 
> [https://github.com/apache/spark/pull/20211#discussion_r160524679,] we 
> reached consensus for supporting an alternative function form:
> {code:java}
> def foo(key, pdf):
>     key  # this is a grouping key. 
>     pdf  # this is the Pandas DataFrame
> pudf = pandas_udf(f=foo, returnType="id int, v double", 
> functionType=GROUP_MAP)
> df.groupby(group_column).apply(pudf){code}
> {code:java}
> import statsmodels.api as sm
> # df has four columns: id, y, x1, x2
> group_column = 'id'
> y_column = 'y'
> x_columns = ['x1', 'x2']
> schema = df.select(group_column, *x_columns).schema
> @pandas_udf(schema, PandasUDFType.GROUP_MAP)
> # Input/output are both a pandas.DataFrame
> def ols(key, pdf):
>     y = pdf[y_column]
>     X = pdf[x_columns]
>     X = sm.add_constant(X)
>     model = sm.OLS(y, X).fit()
>     return pd.DataFrame([key + [model.params[i] for i in x_columns]])
> beta = df.groupby(group_column).apply(ols)
> {code}
>  
> In summary:
>  * Support alternative form f(key, pdf). The current form f(pdf) will still 
> be supported. (Through function inspection)
>  * In both cases, the udf output schema will be the final output schema of 
> the spark DataFrame.
>  * Key will be passed to user as a python tuple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to