[jira] [Comment Edited] (SPARK-28264) Revisiting Python / pandas UDF

2019-07-11 Thread Maciej Szymkiewicz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882774#comment-16882774
 ] 

Maciej Szymkiewicz edited comment on SPARK-28264 at 7/11/19 9:23 AM:
-

Personally I fail to see why some UDF types are needed at all. With "classic"  
UDFs (standard Python UDF, SCALAR, GROUPED_AGG) intended for use with SQL API 
(GroupedData.agg, DataFrame.select, ...) returned object could be described as:

 
{code:java}
class UserDefinedFunctionLike(Protocol):
def __call__(self, *_: ColumnOrName) -> Column: ...
{code}
That makes perfect sense as the result is called explicitly by the end user. 
Additionally, metadata has to be attached to such UDF, to be compatible with 
SQL API.

Now GROUPED_MAP, MAP_ITER and COGROUPED_MAP are quite different beasts, are not 
intended to be called directly by the end user. The API is still the same, one 
can even call the object and get a Column, but such Column is useless in the 
current API. That's confusing at best and at the end of the day it is just a 
leaky abstraction - just because UDFs are used under the covers it doesn't mean 
that the user has to be bothered with all the gory details.

Unlike "SQL" UDF's, MAP_* and *_MAP are used in specialized contexts. This 
creates an opportunity. If we think about the functions with following 
interfaces
{code:java}
class PandasGroupedMapFunction(Protocol):
def __call__(self, _: pandas.core.frame.DataFrame) -> 
pandas.core.frame.DataFrame: ...

class PandasMapIterFunction(Protocol):
def __call__(self, _: Generator[pandas.core.frame.DataFrame]) -> 
Generator[pandas.core.frame.DataFrame]: ...

class PandasCogroupFunction(Protocol):
def __call__(self, left: pandas.core.frame.DataFrame, right: 
pandas.core.frame.DataFrame) -> Generator[pandas.core.frame.DataFrame]: ...
{code}
Then UDFish metadata can be provided directly in apply / mapPartitionsInPandas 
methods:
{code:java}
class CoGroupedData():
def apply(self, udf: PandasCogroupFunction, returnType: DataTypeOrString) 
-> DataFrame: ...

class GroupedData():
def apply(self, udf: PandasGroupedMapFunction, returnType: 
DataTypeOrString) -> DataFrame: ...
def  mapPartitionsInPandas(self, PandasMapIterFunction,  returnType: 
DataTypeOrString) -> DataFrame: 
{code}
Such design could curb proliferation of UDF types, and provide clear 
distinction between SQL and non-SQL API. Additionally separation of schema and 
function could provide some additionally flexibility for the users - Pandas 
functions are often flexible enough to applied to different schemas, while 
schema can be derived on runtime, depending on the input.

That leaves us with SCALAR_ITER thingy... If the current interface is to be 
preserved, one possible solution is to choose convention over configuration:
{code:java}
class PandasScalarFunction(Protocol):
def __call__(self, *_: pandas.core.series.Series) -> 
pandas.core.series.Series: ...

class PandasIterScalarFunction(Protocol):
def __call__(self, _: Iterator[pandas.core.series.Series]) -> 
Generator[pandas.core.series.Series]: ...  # Note Generator in the return type.
{code}
In such case we could avoid exposing new UDF types and simply distinguish 
between both cases based on the type of the input objects:
{code:java}
import inspect

if functionType is PandasUDFType.SCALAR and inspect.isgeneratorfunction(f):
...  # Go SQL_SCALAR_PANDAS_ITER_UDF path

elif functionType is PandasUDFType.SCALAR:
   ...  # Go SQL_SCALAR_PANDAS_UDF path
{code}
Similar approach can be used if other _ITER variants are introduced in the 
future.

If API is to be simplified:
{code:java}
class PandasIterScalarFunction(Protocol):
def __call__(self, _:pandas.core.series.Series) -> 
pandas.core.series.Series: ... 
{code}
_ITER could be simply pushed into pandas_udf signature:
{code:java}
def pandas_udf(f=None, returnType=None, functionType=None, iterate=False): ...
{code}


was (Author: zero323):
Personally I fail to see why some UDF types are needed at all. With "classic"  
UDFs (standard Python UDF, SCALAR, GROUPED_AGG) intended for use with SQL API 
(GroupedData.agg, DataFrame.select, ...) returned object could be described as:

 
{code}
class UserDefinedFunctionLike(Protocol):
def __call__(self, *_: ColumnOrName) -> Column: ...
{code}
That makes perfect sense as the result is called explicitly by the end user. 
Additionally, metadata has to be attached to such UDF, to be compatible with 
SQL API.

Now GROUPED_MAP, MAP_ITER and COGROUPED_MAP are quite different beasts, are not 
intended to be called directly by the end user. The API is still the same, one 
can even call the object and get a Column, but such Column is useless in the 
current API. That's confusing at best.

Unlike "SQL" UDF's, MAP_* and *_MAP are used in specialized contexts. This 
creates an opportunity. If we think about the functions with 

[jira] [Comment Edited] (SPARK-28264) Revisiting Python / pandas UDF

2019-07-11 Thread Maciej Szymkiewicz (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882774#comment-16882774
 ] 

Maciej Szymkiewicz edited comment on SPARK-28264 at 7/11/19 9:18 AM:
-

Personally I fail to see why some UDF types are needed at all. With "classic"  
UDFs (standard Python UDF, SCALAR, GROUPED_AGG) intended for use with SQL API 
(GroupedData.agg, DataFrame.select, ...) returned object could be described as:

 
{code}
class UserDefinedFunctionLike(Protocol):
def __call__(self, *_: ColumnOrName) -> Column: ...
{code}
That makes perfect sense as the result is called explicitly by the end user. 
Additionally, metadata has to be attached to such UDF, to be compatible with 
SQL API.

Now GROUPED_MAP, MAP_ITER and COGROUPED_MAP are quite different beasts, are not 
intended to be called directly by the end user. The API is still the same, one 
can even call the object and get a Column, but such Column is useless in the 
current API. That's confusing at best.

Unlike "SQL" UDF's, MAP_* and *_MAP are used in specialized contexts. This 
creates an opportunity. If we think about the functions with following 
interfaces
{code}
class PandasGroupedMapFunction(Protocol):
def __call__(self, _: pandas.core.frame.DataFrame) -> 
pandas.core.frame.DataFrame: ...

class PandasMapIterFunction(Protocol):
def __call__(self, _: Generator[pandas.core.frame.DataFrame]) -> 
Generator[pandas.core.frame.DataFrame]: ...

class PandasCogroupFunction(Protocol):
def __call__(self, left: pandas.core.frame.DataFrame, right: 
pandas.core.frame.DataFrame) -> Generator[pandas.core.frame.DataFrame]: ...
{code}
Then UDFish metadata can be provided directly in apply / mapPartitionsInPandas 
methods:
{code}
class CoGroupedData():
def apply(self, udf: PandasCogroupFunction, returnType: DataTypeOrString) 
-> DataFrame: ...

class GroupedData():
def apply(self, udf: PandasGroupedMapFunction, returnType: 
DataTypeOrString) -> DataFrame: ...
def  mapPartitionsInPandas(self, PandasMapIterFunction,  returnType: 
DataTypeOrString) -> DataFrame: 
{code}
Such design could curb proliferation of UDF types, and provide clear 
distinction between SQL and non-SQL API. Additionally separation of schema and 
function could provide some additionally flexibility for the users - Pandas 
functions are often flexible enough to applied to different schemas, while 
schema can be derived on runtime, depending on the input. 

That leaves us with SCALAR_ITER thingy... If the current interface is to be 
preserved, one possible solution is to choose convention over configuration:
{code}
class PandasScalarFunction(Protocol):
def __call__(self, *_: pandas.core.series.Series) -> 
pandas.core.series.Series: ...

class PandasIterScalarFunction(Protocol):
def __call__(self, _: Iterator[pandas.core.series.Series]) -> 
Generator[pandas.core.series.Series]: ...  # Note Generator in the return type.
{code}
In such case we could avoid exposing new UDF types and simply distinguish 
between both cases based on the type of the input objects:
{code}
import inspect

if functionType is PandasUDFType.SCALAR and inspect.isgeneratorfunction(f):
...  # Go SQL_SCALAR_PANDAS_ITER_UDF path

elif functionType is PandasUDFType.SCALAR:
   ...  # Go SQL_SCALAR_PANDAS_UDF path
{code}
Similar approach can be used if other _ITER variants are introduced in the 
future.

If API is to be simplified:
{code}
class PandasIterScalarFunction(Protocol):
def __call__(self, _:pandas.core.series.Series) -> 
pandas.core.series.Series: ... 
{code}
_ITER could be simply pushed into pandas_udf signature:
{code}
def pandas_udf(f=None, returnType=None, functionType=None, iterate=False): ...
{code}


was (Author: zero323):
Personally I fail to see why some UDF types are needed at all. With "classic"  
UDFs (standard Python UDF, SCALAR, GROUPED_AGG) intended for use with SQL API 
(GroupedData.agg, DataFrame.select, ...) returned object could be described as:

 
{code:python}
class UserDefinedFunctionLike(Protocol):
def __call__(self, *_: ColumnOrName) -> Column: ...
{code}

That makes perfect sense as the result is called explicitly by the end user. 
Additionally, metadata has to be attached to such UDF, to be compatible with 
SQL API.

Now GROUPED_MAP, MAP_ITER and COGROUPED_MAP are quite different beasts, are not 
intended to be called directly by the end user.  The API is still the same, one 
can even call the object and get a Column, but such Column is useless in the 
current API. That's confusing at best.

Unlike "SQL" UDF's, MAP_* and *_MAP are used in specialized contexts. This 
creates an opportunity. If we think about the functions with following 
interfaces


{code:python}
class PandasGroupedMapFunction(Protocol):
def __call__(self, _: pandas.core.frame.DataFrame) -> 
pandas.core.frame.DataFrame: ...

class