[ 
https://issues.apache.org/jira/browse/SPARK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Jin updated SPARK-23011:
---------------------------
    Description: 
The current semantics of groupby apply is that the output schema of groupby 
apply is the same as the output schema of the UDF. Because grouping column is 
usually useful to users, users often need to output grouping columns in the 
UDF. To further explain, consider the following example:
{code:java}
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], 
columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)
{code}
Although the UDF (linear regression) has nothing to do with the grouping 
column, the user needs to deal with grouping column in the UDF. In other words, 
the UDF is tightly coupled with the grouping column.

 

With discussion in 
[https://github.com/apache/spark/pull/20211#discussion_r160524679,] we reached 
consensus for supporting an alternative function form:
{code:java}
def foo(key, pdf):
    key  # this is a grouping key. 
    pdf  # this is the Pandas DataFrame

pudf = pandas_udf(f=foo, returnType="id int, v double", functionType=GROUP_MAP)
df.groupby(group_column).apply(pudf){code}
{code:java}
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(key, pdf):
    y = pdf[y_column]
    X = pdf[x_columns]
    X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([key + [model.params[i] for i in x_columns]])

beta = df.groupby(group_column).apply(ols)
{code}
 

In summary:
 * Support alternative form f(key, pdf). The current form f(pdf) will still be 
supported. (Through function inspection)
 * In both cases, the udf output schema will be the final output schema of the 
groupby apply.
 * Key will be passed to user as a python tuple.

  was:
The current semantics of groupby apply is that the output schema of groupby 
apply is the same as the output schema of the UDF. Because grouping column is 
usually useful to users, users often need to output grouping columns in the 
UDF. To further explain, consider the following example:
{code:java}
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], 
columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)
{code}
Although the UDF (linear regression) has nothing to do with the grouping 
column, the user needs to deal with grouping column in the UDF. In other words, 
the UDF is tightly coupled with the grouping column.

 

After discussion in 

 


> Support alternative function form with group aggregate pandas UDF
> -----------------------------------------------------------------
>
>                 Key: SPARK-23011
>                 URL: https://issues.apache.org/jira/browse/SPARK-23011
>             Project: Spark
>          Issue Type: Sub-task
>          Components: PySpark
>    Affects Versions: 2.3.0
>            Reporter: Li Jin
>            Priority: Major
>
> The current semantics of groupby apply is that the output schema of groupby 
> apply is the same as the output schema of the UDF. Because grouping column is 
> usually useful to users, users often need to output grouping columns in the 
> UDF. To further explain, consider the following example:
> {code:java}
> import statsmodels.api as sm
> # df has four columns: id, y, x1, x2
> group_column = 'id'
> y_column = 'y'
> x_columns = ['x1', 'x2']
> schema = df.select(group_column, *x_columns).schema
> @pandas_udf(schema, PandasUDFType.GROUP_MAP)
> # Input/output are both a pandas.DataFrame
> def ols(pdf):
>     group_key = pdf[group_column].iloc[0]
>     y = pdf[y_column]
>     X = pdf[x_columns]
>       X = sm.add_constant(X)
>     model = sm.OLS(y, X).fit()
>     return pd.DataFrame([[group_key] + [model.params[i] for i in   
> x_columns]], columns=[group_column] + x_columns)
> beta = df.groupby(group_column).apply(ols)
> {code}
> Although the UDF (linear regression) has nothing to do with the grouping 
> column, the user needs to deal with grouping column in the UDF. In other 
> words, the UDF is tightly coupled with the grouping column.
>  
> With discussion in 
> [https://github.com/apache/spark/pull/20211#discussion_r160524679,] we 
> reached consensus for supporting an alternative function form:
> {code:java}
> def foo(key, pdf):
>     key  # this is a grouping key. 
>     pdf  # this is the Pandas DataFrame
> pudf = pandas_udf(f=foo, returnType="id int, v double", 
> functionType=GROUP_MAP)
> df.groupby(group_column).apply(pudf){code}
> {code:java}
> import statsmodels.api as sm
> # df has four columns: id, y, x1, x2
> group_column = 'id'
> y_column = 'y'
> x_columns = ['x1', 'x2']
> schema = df.select(group_column, *x_columns).schema
> @pandas_udf(schema, PandasUDFType.GROUP_MAP)
> # Input/output are both a pandas.DataFrame
> def ols(key, pdf):
>     y = pdf[y_column]
>     X = pdf[x_columns]
>     X = sm.add_constant(X)
>     model = sm.OLS(y, X).fit()
>     return pd.DataFrame([key + [model.params[i] for i in x_columns]])
> beta = df.groupby(group_column).apply(ols)
> {code}
>  
> In summary:
>  * Support alternative form f(key, pdf). The current form f(pdf) will still 
> be supported. (Through function inspection)
>  * In both cases, the udf output schema will be the final output schema of 
> the groupby apply.
>  * Key will be passed to user as a python tuple.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to