[ https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16162205#comment-16162205 ]
Bryan Cutler commented on SPARK-21190: -------------------------------------- Thanks [~icexelloss]. I definitely think collaboration should be encouraged, especially in this case where there was existing work done by you and I that could have been leveraged. The major difference between [#18659|https://github.com/apache/spark/pull/18659] and [#19147|https://github.com/apache/spark/pull/19147] is the Arrow data format used. #19147 uses the Arrow stream format which has some pros and cons that I brought up in the PR. > SPIP: Vectorized UDFs in Python > ------------------------------- > > Key: SPARK-21190 > URL: https://issues.apache.org/jira/browse/SPARK-21190 > Project: Spark > Issue Type: New Feature > Components: PySpark, SQL > Affects Versions: 2.2.0 > Reporter: Reynold Xin > Assignee: Reynold Xin > Labels: SPIP > Attachments: SPIPVectorizedUDFsforPython (1).pdf > > > *Background and Motivation* > Python is one of the most popular programming languages among Spark users. > Spark currently exposes a row-at-a-time interface for defining and executing > user-defined functions (UDFs). This introduces high overhead in serialization > and deserialization, and also makes it difficult to leverage Python libraries > (e.g. numpy, Pandas) that are written in native code. > > This proposal advocates introducing new APIs to support vectorized UDFs in > Python, in which a block of data is transferred over to Python in some > columnar format for execution. > > > *Target Personas* > Data scientists, data engineers, library developers. > > *Goals* > - Support vectorized UDFs that apply on chunks of the data frame > - Low system overhead: Substantially reduce serialization and deserialization > overhead when compared with row-at-a-time interface > - UDF performance: Enable users to leverage native libraries in Python (e.g. > numpy, Pandas) for data manipulation in these UDFs > > *Non-Goals* > The following are explicitly out of scope for the current SPIP, and should be > done in future SPIPs. Nonetheless, it would be good to consider these future > use cases during API design, so we can achieve some consistency when rolling > out new APIs. > > - Define block oriented UDFs in other languages (that are not Python). > - Define aggregate UDFs > - Tight integration with machine learning frameworks > > *Proposed API Changes* > The following sketches some possibilities. I haven’t spent a lot of time > thinking about the API (wrote it down in 5 mins) and I am not attached to > this design at all. The main purpose of the SPIP is to get feedback on use > cases and see how they can impact API design. > > A few things to consider are: > > 1. Python is dynamically typed, whereas DataFrames/SQL requires static, > analysis time typing. This means users would need to specify the return type > of their UDFs. > > 2. Ratio of input rows to output rows. We propose initially we require number > of output rows to be the same as the number of input rows. In the future, we > can consider relaxing this constraint with support for vectorized aggregate > UDFs. > 3. How do we handle null values, since Pandas doesn't have the concept of > nulls? > > Proposed API sketch (using examples): > > Use case 1. A function that defines all the columns of a DataFrame (similar > to a “map” function): > > {code} > @spark_udf(some way to describe the return schema) > def my_func_on_entire_df(input): > """ Some user-defined function. > > :param input: A Pandas DataFrame with two columns, a and b. > :return: :class: A Pandas data frame. > """ > input[c] = input[a] + input[b] > Input[d] = input[a] - input[b] > return input > > spark.range(1000).selectExpr("id a", "id / 2 b") > .mapBatches(my_func_on_entire_df) > {code} > > Use case 2. A function that defines only one column (similar to existing > UDFs): > > {code} > @spark_udf(some way to describe the return schema) > def my_func_that_returns_one_column(input): > """ Some user-defined function. > > :param input: A Pandas DataFrame with two columns, a and b. > :return: :class: A numpy array > """ > return input[a] + input[b] > > my_func = udf(my_func_that_returns_one_column) > > df = spark.range(1000).selectExpr("id a", "id / 2 b") > df.withColumn("c", my_func(df.a, df.b)) > {code} > > > > *Optional Design Sketch* > I’m more concerned about getting proper feedback for API design. The > implementation should be pretty straightforward and is not a huge concern at > this point. We can leverage the same implementation for faster toPandas > (using Arrow). > > > *Optional Rejected Designs* > See above. > > > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org