[ https://issues.apache.org/jira/browse/SPARK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-23011: ------------------------------------ Assignee: (was: Apache Spark) > Prepend missing grouping columns in groupby apply > ------------------------------------------------- > > Key: SPARK-23011 > URL: https://issues.apache.org/jira/browse/SPARK-23011 > Project: Spark > Issue Type: Sub-task > Components: PySpark > Affects Versions: 2.3.0 > Reporter: Li Jin > > The current semantics of groupby apply is that the output schema of groupby > apply is the same as the output schema of the UDF. Because grouping column is > usually useful to users, users often need to output grouping columns in the > UDF. To further explain, consider the following example: > {code:java} > import statsmodels.api as sm > # df has four columns: id, y, x1, x2 > group_column = 'id' > y_column = 'y' > x_columns = ['x1', 'x2'] > schema = df.select(group_column, *x_columns).schema > @pandas_udf(schema, PandasUDFType.GROUP_MAP) > # Input/output are both a pandas.DataFrame > def ols(pdf): > group_key = pdf[group_column].iloc[0] > y = pdf[y_column] > X = pdf[x_columns] > X = sm.add_constant(X) > model = sm.OLS(y, X).fit() > return pd.DataFrame([[group_key] + [model.params[i] for i in > x_columns]], columns=[group_column] + x_columns) > beta = df.groupby(group_column).apply(ols) > {code} > Although the UDF (linear regression) has nothing to do with the grouping > column, the user needs to deal with grouping column in the UDF. In other > words, the UDF is tightly coupled with the grouping column. > Here I propose we prepend grouping columns that are not returned by the UDF > to the result of groupby apply. With this change, users can write UDFs that > are decoupled from the grouping column: > {code:java} > import statsmodels.api as sm > # df has four columns: id, y, x1, x2 > group_column = 'id' > y_column = 'y' > x_columns = ['x1', 'x2'] > schema = df.select(*x_columns).schema > @pandas_udf(schema, PandasUDFType.GROUP_MAP) > # Input/output are both a pandas.DataFrame > def ols(pdf): > y = pdf[y_column] > X = pdf[x_columns] > X = sm.add_constant(X) > model = sm.OLS(y, X).fit() > return pd.DataFrame([[model.params[i] for i in x_columns]], > columns=x_columns) > beta = df.groupby(group_column).apply(ols) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org