[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin updated SPARK-21190:
--------------------------------
    Description: 
*Background and Motivation*
Python is one of the most popular programming languages among Spark users. 
Spark currently exposes a row-at-a-time interface for defining and executing 
user-defined functions (UDFs). This introduces high overhead in serialization 
and deserialization, and also makes it difficult to leverage Python libraries 
(e.g. numpy, Pandas) that are written in native code.
 
This proposal advocates introducing new APIs to support vectorized UDFs in 
Python, in which a block of data is transferred over to Python in some columnar 
format for execution.

 
 
*Target Personas*
Data scientists, data engineers, library developers.
 

*Goals*
- Support vectorized UDFs that apply on chunks of the data frame
- Low system overhead: Substantially reduce serialization and deserialization 
overhead when compared with row-at-a-time interface
- UDF performance: Enable users to leverage native libraries in Python (e.g. 
numpy, Pandas) for data manipulation in these UDFs
 

*Non-Goals*
The following are explicitly out of scope for the current SPIP, and should be 
done in future SPIPs. Nonetheless, it would be good to consider these future 
use cases during API design, so we can achieve some consistency when rolling 
out new APIs.
 
- Define block oriented UDFs in other languages (that are not Python).
- Define aggregate UDFs
- Tight integration with machine learning frameworks

 
*Proposed API Changes*
The following sketches some possibilities. I haven’t spent a lot of time 
thinking about the API (wrote it down in 5 mins) and I am not attached to this 
design at all. The main purpose of the SPIP is to get feedback on use cases and 
see how they can impact API design.
 
Two things to consider are:
 
1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
analysis time typing. This means users would need to specify the return type of 
their UDFs.
 
2. Ratio of input rows to output rows. We propose initially we require number 
of output rows to be the same as the number of input rows. In the future, we 
can consider relaxing this constraint with support for vectorized aggregate 
UDFs.
 
Proposed API sketch (using examples):
 
Use case 1. A function that defines all the columns of a DataFrame (similar to 
a “map” function):
 
{code}
@spark_udf(some way to describe the return schema)
def my_func_on_entire_df(input):
  """ Some user-defined function.
 
  :param input: A Pandas DataFrame with two columns, a and b.
  :return: :class: A Pandas data frame.
  """
  input[c] = input[a] + input[b]
  return input
 
spark.range(1000).selectExpr("id a", "id / 2 b")
  .mapBatches(my_func_on_entire_df)
{code}
 
 
Use case 2. A function that defines only one column (similar to existing UDFs):
 
{code}
@spark_udf(some way to describe the return schema)
def my_func_that_returns_one_column(input):
  """ Some user-defined function.
 
  :param input: A Pandas DataFrame with two columns, a and b.
  :return: :class: A numpy array
  """
  return input[a] + input[b]
 
my_func = udf(my_func_that_returns_one_column)
 
df = spark.range(1000).selectExpr("id a", "id / 2 b")
df.withColumn("c", my_func(df.a, df.b))
{code}

 
 
 
*Optional Design Sketch*
I’m more concerned about getting proper feedback for API design. The 
implementation should be pretty straightforward and is not a huge concern at 
this point. We can leverage the same implementation for faster toPandas (using 
Arrow).

 
 
*Optional Rejected Designs*
See above.
 
 
 
 


  was:
*Background and Motivation*
Python is one of the most popular programming languages among Spark users. 
Spark currently exposes a row-at-a-time interface for defining and executing 
user-defined functions (UDFs). This introduces high overhead in serialization 
and deserialization, and also makes it difficult to leverage Python libraries 
that are written in native code. This proposal advocates introducing new APIs 
to support vectorized UDFs in Python, in which a block of data is transferred 
over to Python in some column format for execution.
 
 
*Target Personas*
Data scientists, data engineers, library developers.
 

*Goals*
... todo ...
 

*Non-Goals*
- Define block oriented UDFs in other languages (that are not Python).
- Define aggregate UDFs
 
 
*Proposed API Changes*
... todo ...
 
 
 
*Optional Design Sketch*
The implementation should be pretty straightforward and is not a huge concern 
at this point. I’m more concerned about getting proper feedback for API design.
 
 
*Optional Rejected Designs*
See above.
 
 
 
 



> SPIP: Vectorized UDFs in Python
> -------------------------------
>
>                 Key: SPARK-21190
>                 URL: https://issues.apache.org/jira/browse/SPARK-21190
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark, SQL
>    Affects Versions: 2.2.0
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>              Labels: SPIP
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> Two things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my_func(df.a, df.b))
> {code}
>  
>  
>  
> *Optional Design Sketch*
> I’m more concerned about getting proper feedback for API design. The 
> implementation should be pretty straightforward and is not a huge concern at 
> this point. We can leverage the same implementation for faster toPandas 
> (using Arrow).
>  
>  
> *Optional Rejected Designs*
> See above.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to