[ 
https://issues.apache.org/jira/browse/SPARK-28264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882774#comment-16882774
 ] 

Maciej Szymkiewicz edited comment on SPARK-28264 at 7/11/19 9:18 AM:
---------------------------------------------------------------------

Personally I fail to see why some UDF types are needed at all. With "classic"  
UDFs (standard Python UDF, SCALAR, GROUPED_AGG) intended for use with SQL API 
(GroupedData.agg, DataFrame.select, ...) returned object could be described as:

 
{code}
class UserDefinedFunctionLike(Protocol):
    def __call__(self, *_: ColumnOrName) -> Column: ...
{code}
That makes perfect sense as the result is called explicitly by the end user. 
Additionally, metadata has to be attached to such UDF, to be compatible with 
SQL API.

Now GROUPED_MAP, MAP_ITER and COGROUPED_MAP are quite different beasts, are not 
intended to be called directly by the end user. The API is still the same, one 
can even call the object and get a Column, but such Column is useless in the 
current API. That's confusing at best.

Unlike "SQL" UDF's, MAP_* and *_MAP are used in specialized contexts. This 
creates an opportunity. If we think about the functions with following 
interfaces
{code}
class PandasGroupedMapFunction(Protocol):
    def __call__(self, _: pandas.core.frame.DataFrame) -> 
pandas.core.frame.DataFrame: ...

class PandasMapIterFunction(Protocol):
    def __call__(self, _: Generator[pandas.core.frame.DataFrame]) -> 
Generator[pandas.core.frame.DataFrame]: ...

class PandasCogroupFunction(Protocol):
    def __call__(self, left: pandas.core.frame.DataFrame, right: 
pandas.core.frame.DataFrame) -> Generator[pandas.core.frame.DataFrame]: ...
{code}
Then UDFish metadata can be provided directly in apply / mapPartitionsInPandas 
methods:
{code}
class CoGroupedData():
    def apply(self, udf: PandasCogroupFunction, returnType: DataTypeOrString) 
-> DataFrame: ...

class GroupedData():
    def apply(self, udf: PandasGroupedMapFunction, returnType: 
DataTypeOrString) -> DataFrame: ...
    def  mapPartitionsInPandas(self, PandasMapIterFunction,  returnType: 
DataTypeOrString) -> DataFrame: ....
{code}
Such design could curb proliferation of UDF types, and provide clear 
distinction between SQL and non-SQL API. Additionally separation of schema and 
function could provide some additionally flexibility for the users - Pandas 
functions are often flexible enough to applied to different schemas, while 
schema can be derived on runtime, depending on the input. 

That leaves us with SCALAR_ITER thingy... If the current interface is to be 
preserved, one possible solution is to choose convention over configuration:
{code}
class PandasScalarFunction(Protocol):
    def __call__(self, *_: pandas.core.series.Series) -> 
pandas.core.series.Series: ...

class PandasIterScalarFunction(Protocol):
    def __call__(self, _: Iterator[pandas.core.series.Series]) -> 
Generator[pandas.core.series.Series]: ...  # Note Generator in the return type.
{code}
In such case we could avoid exposing new UDF types and simply distinguish 
between both cases based on the type of the input objects:
{code}
import inspect

if functionType is PandasUDFType.SCALAR and inspect.isgeneratorfunction(f):
    ...  # Go SQL_SCALAR_PANDAS_ITER_UDF path

elif functionType is PandasUDFType.SCALAR:
   ...  # Go SQL_SCALAR_PANDAS_UDF path
{code}
Similar approach can be used if other _ITER variants are introduced in the 
future.

If API is to be simplified:
{code}
class PandasIterScalarFunction(Protocol):
    def __call__(self, _:pandas.core.series.Series) -> 
pandas.core.series.Series: ... 
{code}
_ITER could be simply pushed into pandas_udf signature:
{code}
def pandas_udf(f=None, returnType=None, functionType=None, iterate=False): ...
{code}


was (Author: zero323):
Personally I fail to see why some UDF types are needed at all. With "classic"  
UDFs (standard Python UDF, SCALAR, GROUPED_AGG) intended for use with SQL API 
(GroupedData.agg, DataFrame.select, ...) returned object could be described as:

 
{code:python}
class UserDefinedFunctionLike(Protocol):
    def __call__(self, *_: ColumnOrName) -> Column: ...
{code}

That makes perfect sense as the result is called explicitly by the end user. 
Additionally, metadata has to be attached to such UDF, to be compatible with 
SQL API.

Now GROUPED_MAP, MAP_ITER and COGROUPED_MAP are quite different beasts, are not 
intended to be called directly by the end user.  The API is still the same, one 
can even call the object and get a Column, but such Column is useless in the 
current API. That's confusing at best.

Unlike "SQL" UDF's, MAP_* and *_MAP are used in specialized contexts. This 
creates an opportunity. If we think about the functions with following 
interfaces


{code:python}
class PandasGroupedMapFunction(Protocol):
    def __call__(self, _: pandas.core.frame.DataFrame) -> 
pandas.core.frame.DataFrame: ...

class PandasMapIterFunction(Protocol):
    def __call__(self, _: Generator[pandas.core.frame.DataFrame]) -> 
Generator[pandas.core.frame.DataFrame]: ...

class PandasCogroupFunction(Protocol):
    def __call__(self, left: pandas.core.frame.DataFrame, right: 
pandas.core.frame.DataFrame) -> Generator[pandas.core.frame.DataFrame]: ...
{code}
 

Then UDFish metadata can be provided directly in apply / mapPartitionsInPandas 
methods:



{code:python}
class CoGroupedData():
    def apply(self, udf: PandasCogroupFunction, returnType: DataTypeOrString) 
-> DataFrame: ...

class GroupedData():
    def apply(self, udf: PandasGroupedMapFunction, returnType: 
DataTypeOrString) -> DataFrame: ...
    def  mapPartitionsInPandas(self, PandasMapIterFunction,  returnType: 
DataTypeOrString) -> DataFrame: ....
{code}

Such design could curb proliferation of UDF types, and provide clear 
distinction between SQL and non-SQL API.

That leaves us with SCALAR_ITER thingy... If the current interface is to be 
preserved, one possible solution is to choose convention over configuration:


{code:python}
class PandasScalarFunction(Protocol):
    def __call__(self, *_: pandas.core.series.Series) -> 
pandas.core.series.Series: ...

class PandasIterScalarFunction(Protocol):
    def __call__(self, _: Iterator[pandas.core.series.Series]) -> 
Generator[pandas.core.series.Series]: ...  # Note Generator in the return type.
{code}

In such case we could avoid exposing new UDF types and simply distinguish 
between both cases based on the type of the input objects:


{code:python}
import inspect

if functionType is PandasUDFType.SCALAR and inspect.isgeneratorfunction(f):
    ...  # Go SQL_SCALAR_PANDAS_ITER_UDF path

elif functionType is PandasUDFType.SCALAR:
   ...  # Go SQL_SCALAR_PANDAS_UDF path
{code}

Similar approach can be used if other _ITER variants are introduced in the 
future.

If API is to be simplified:

{code:python}
class PandasIterScalarFunction(Protocol):
    def __call__(self, _:pandas.core.series.Series) -> 
pandas.core.series.Series: ... 
{code}

_ITER could be simply pushed into pandas_udf signature:

{code:python}
def pandas_udf(f=None, returnType=None, functionType=None, iterate=False): ...
{code}



> Revisiting Python / pandas UDF
> ------------------------------
>
>                 Key: SPARK-28264
>                 URL: https://issues.apache.org/jira/browse/SPARK-28264
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, SQL
>    Affects Versions: 3.0.0
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>            Priority: Major
>
> In the past two years, the pandas UDFs are perhaps the most important changes 
> to Spark for Python data science. However, these functionalities have evolved 
> organically, leading to some inconsistencies and confusions among users. This 
> document revisits UDF definition and naming, as a result of discussions among 
> Xiangrui, Li Jin, Hyukjin, and Reynold.
>  
> See document here: 
> [https://docs.google.com/document/d/10Pkl-rqygGao2xQf6sddt0b-4FYK4g8qr_bXLKTL65A/edit#|https://docs.google.com/document/d/10Pkl-rqygGao2xQf6sddt0b-4FYK4g8qr_bXLKTL65A/edit]
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to