[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069269#comment-16069269
 ] 

Leif Walsh commented on SPARK-21190:
------------------------------------

I agree with [~icexelloss] that we should aim to provide an API which provides 
"logical" groups of data to the UDF rather than the implementation detail of 
providing partitions wholesale.  Setting aside for the moment the problem with 
dataset skew which could cause one group to be very large, let's look at some 
use cases.

One obvious use case that tools like dplyr and pandas support is 
{{df.groupby(...).aggregate(...)}}.  Here, we group on some key and apply a 
function to each logical group.  This can be used to e.g. demean each group 
w.r.t. its cohort.  Another use case that we care about with 
[Flint|https://github.com/twosigma/flint] is aggregating over a window.  In 
pandas terminology this is the {{rolling}} operator.  One might want to, for 
each row, perform a moving window average or rolling regression over a history 
of some size.  The windowed aggregation poses a performance question that the 
groupby case doesn't: namely, if we naively send each window to the python 
worker independently, we're transferring a lot of duplicate data since each 
overlapped window contains many of the same rows.  An option here is to 
transfer the entire partition on the backend and then instruct the python 
worker to call the UDF with slices of the whole dataset according to the 
windowing requested by the user.

I think the idea of presenting a whole partition in a pandas dataframe to a UDF 
is a bit off-track.  If someone really wants to apply a python function to the 
"whole" dataset, they'd be best served by pulling those data back to the driver 
and just using pandas, if they tried to use spark's partitions they'd get 
somewhat arbitrary partitions and have to implement some kind of merge operator 
on their own.  However, with grouped and windowed aggregations, we can provide 
an API which truly is parallelizable and useful.

I want to focus on use cases where we actually can parallelize without 
requiring a merge operator right now.  Aggregators in pandas and related tools 
in the ecosystem usually assume they have access to all the data for an 
operation and don't need to merge results of subaggregations.  For aggregations 
over larger datasets you'd really want to encourage the use of native Spark 
operations (that use e.g. {{treeAggregate}}).

Does that make sense?  I think it focuses the problem nicely that it becomes 
fairly tractable.

I think the really hard part of this API design is deciding what the inputs and 
outputs of the UDF look like, and providing for the myriad use cases therein.  
For example, one might want to aggregate each group down to a scalar (e.g. 
mean) and do something with that (either produce a reduced dataset with one 
value per group, or add a column where each group has the same value across all 
rows), or one might want to compute over the group and produce a value per row 
within the group and attach that as a new column (e.g. demeaning or ranking).  
These translate roughly to the differences between the [**ply operations in 
dplyr|https://www.jstatsoft.org/article/view/v040i01/v40i01.pdf] or the 
differences in pandas between {{df.groupby(...).agg(...)}} and 
{{df.groupby(...).transform(...)}} and {{df.groupby(...).apply(...)}}.

> SPIP: Vectorized UDFs in Python
> -------------------------------
>
>                 Key: SPARK-21190
>                 URL: https://issues.apache.org/jira/browse/SPARK-21190
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark, SQL
>    Affects Versions: 2.2.0
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>              Labels: SPIP
>         Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> Two things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my_func(df.a, df.b))
> {code}
>  
>  
>  
> *Optional Design Sketch*
> I’m more concerned about getting proper feedback for API design. The 
> implementation should be pretty straightforward and is not a huge concern at 
> this point. We can leverage the same implementation for faster toPandas 
> (using Arrow).
>  
>  
> *Optional Rejected Designs*
> See above.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to