Welcome to the first real SPIP.

SPIP: Vectorized UDFs for Python



https://issues.apache.org/jira/browse/SPARK-21190



Background and Motivation:



Python is one of the most popular programming languages among Spark users.
Spark currently exposes a row-at-a-time interface for defining and
executing user-defined functions (UDFs). This introduces high overhead in
serialization and deserialization, and also makes it difficult to leverage
Python libraries (e.g. numpy, Pandas) that are written in native code.



This proposal advocates introducing new APIs to support vectorized UDFs in
Python, in which a block of data is transferred over to Python in some
columnar format for execution.





Target Personas:

Data scientists, data engineers, library developers.



Goals:

   -

   Support vectorized UDFs that apply on chunks of the data frame
   -

   Low system overhead: Substantially reduce serialization and
   deserialization overhead when compared with row-at-a-time interface
   -

   UDF performance: Enable users to leverage native libraries in Python
   (e.g. numpy, Pandas) for data manipulation in these UDFs





Non-Goals:



The following are explicitly out of scope for the current SPIP, and should
be done in future SPIPs. Nonetheless, it would be good to consider these
future use cases during API design, so we can achieve some consistency when
rolling out new APIs.



- Define block oriented UDFs in other languages (that are not Python).

- Define aggregate UDFs

- Tight integration with machine learning frameworks





Proposed API Changes:



The following sketches some possibilities. I haven’t spent a lot of time
thinking about the API (wrote it down in 5 mins) and I am not attached to
this design at all. The main purpose of the SPIP is to get feedback on use
cases and see how they can impact API design.



Some things to consider are:



1. Python is dynamically typed, whereas DataFrames/SQL requires static,
analysis time typing. This means users would need to specify the return
type of their UDFs.



2. Ratio of input rows to output rows. We propose initially we require
number of output rows to be the same as the number of input rows. In the
future, we can consider relaxing this constraint with support for
vectorized aggregate UDFs.



3. On the Python side, what do we expose? The common libraries are numpy
(more for numeric / machine learning), and Pandas (more for structured data
analysis).





Proposed API sketch (using examples):



Use case 1. A function that defines all the columns of a DataFrame (similar
to a “map” function):



@spark_udf(some way to describe the return schema)

def my_func_on_entire_df(input):

 """ Some user-defined function.



 :param input: A Pandas DataFrame with two columns, a and b.

 :return: :class: A Pandas data frame.

 """

 input[c] = input[a] + input[b]

 Input[d] = input[a] - input[b]

 return input



spark.range(1000).selectExpr("id a", "id / 2 b")

 .mapBatches(my_func_on_entire_df)





Use case 2. A function that defines only one column (similar to existing
UDFs):



@spark_udf(some way to describe the return schema)

def my_func_that_returns_one_column(input):

 """ Some user-defined function.



 :param input: A Pandas DataFrame with two columns, a and b.

 :return: :class: A numpy array

 """

 return input[a] + input[b]



my_func = udf(my_func_that_returns_one_column)



df = spark.range(1000).selectExpr("id a", "id / 2 b")

df.withColumn("c", my_func(df.a, df.b))





Optional Design Sketch:



I’m more concerned about getting proper feedback for API design. The
implementation should be pretty straightforward and is not a huge concern
at this point. We can leverage the same implementation for faster toPandas
(using Arrow).





Optional Rejected Designs:



See above.

Reply via email to