[ 
https://issues.apache.org/jira/browse/SPARK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23011:
------------------------------------

    Assignee:     (was: Apache Spark)

> Prepend missing grouping columns in groupby apply
> -------------------------------------------------
>
>                 Key: SPARK-23011
>                 URL: https://issues.apache.org/jira/browse/SPARK-23011
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.0
>            Reporter: Li Jin
>
> The current semantics of groupby apply is that the output schema of groupby 
> apply is the same as the output schema of the UDF. Because grouping column is 
> usually useful to users, users often need to output grouping columns in the 
> UDF. To further explain, consider the following example:
> {code:java}
> import statsmodels.api as sm
> # df has four columns: id, y, x1, x2
> group_column = 'id'
> y_column = 'y'
> x_columns = ['x1', 'x2']
> schema = df.select(group_column, *x_columns).schema
> @pandas_udf(schema, PandasUDFType.GROUP_MAP)
> # Input/output are both a pandas.DataFrame
> def ols(pdf):
>     group_key = pdf[group_column].iloc[0]
>     y = pdf[y_column]
>     X = pdf[x_columns]
>       X = sm.add_constant(X)
>     model = sm.OLS(y, X).fit()
>     return pd.DataFrame([[group_key] + [model.params[i] for i in   
> x_columns]], columns=[group_column] + x_columns)
> beta = df.groupby(group_column).apply(ols)
> {code}
> Although the UDF (linear regression) has nothing to do with the grouping 
> column, the user needs to deal with grouping column in the UDF. In other 
> words, the UDF is tightly coupled with the grouping column.
> Here I propose we prepend grouping columns that are not returned by the UDF 
> to the result of groupby apply. With this change, users can write UDFs that 
> are decoupled from the grouping column:
> {code:java}
> import statsmodels.api as sm
> # df has four columns: id, y, x1, x2
> group_column = 'id'
> y_column = 'y'
> x_columns = ['x1', 'x2']
> schema = df.select(*x_columns).schema
> @pandas_udf(schema, PandasUDFType.GROUP_MAP)
> # Input/output are both a pandas.DataFrame
> def ols(pdf):
>     y = pdf[y_column]
>     X = pdf[x_columns]
>       X = sm.add_constant(X)
>     model = sm.OLS(y, X).fit()
>     return pd.DataFrame([[model.params[i] for i in   x_columns]], 
> columns=x_columns)
> beta = df.groupby(group_column).apply(ols)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to