[ https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100582#comment-16100582 ]
Li Jin edited comment on SPARK-21190 at 7/25/17 8:17 PM: --------------------------------------------------------- I have created this PR for the groupby().apply() use case with pandas udf: https://github.com/apache/spark/pull/18732 This PR consists of a few parts that can be broken into smaller PRs: (1) pandas_udf: This is a new API in pyspark.sql.functions what allows using pandas udf with various pyspark sql functions discussed here (group, window, withColumn) (2) arrow batch record -> unsafe row conversion This is a set of converters to turn arrow batch record -> unsafe rows. (The other direction has already been implemented). This is needed for pandas udf regardless of the API choices. (3) PythonRDD and pyspark.worker: Supports passing arrow data between executors / python worker and processing of arrow data on pyspark worker. (4) df.groupby().apply() function in pyspark sql This is a new API in pyspark sql that implements split-apply-merge pattern with pyspark and pandas udf. I am hoping to get some feedbacks here as well as working together to figure the next step for vectorized UDFs was (Author: icexelloss): I have created this PR for the groupby().apply() use case with pandas udf: https://github.com/apache/spark/pull/18732 This PR consists of a few parts that can be broken into smaller PRs: (1) pandas_udf: This is a new API in pyspark.sql.functions what allows using pandas udf with various pyspark sql functions discussed here (group, window, withColumn) (2) arrow batch record -> unsafe row conversion This is a set of converters to turn arrow batch record -> unsafe rows. (The other direction has already been implemented). This is needed for pandas udf regardless of the API choices. (3) df.groupby().apply() function in pyspark sql This is a new API in pyspark sql that implements split-apply-merge pattern with pyspark and pandas udf. I am hoping to get some feedbacks here as well as working together to figure the next step for vectorized UDFs > SPIP: Vectorized UDFs in Python > ------------------------------- > > Key: SPARK-21190 > URL: https://issues.apache.org/jira/browse/SPARK-21190 > Project: Spark > Issue Type: New Feature > Components: PySpark, SQL > Affects Versions: 2.2.0 > Reporter: Reynold Xin > Assignee: Reynold Xin > Labels: SPIP > Attachments: SPIPVectorizedUDFsforPython (1).pdf > > > *Background and Motivation* > Python is one of the most popular programming languages among Spark users. > Spark currently exposes a row-at-a-time interface for defining and executing > user-defined functions (UDFs). This introduces high overhead in serialization > and deserialization, and also makes it difficult to leverage Python libraries > (e.g. numpy, Pandas) that are written in native code. > > This proposal advocates introducing new APIs to support vectorized UDFs in > Python, in which a block of data is transferred over to Python in some > columnar format for execution. > > > *Target Personas* > Data scientists, data engineers, library developers. > > *Goals* > - Support vectorized UDFs that apply on chunks of the data frame > - Low system overhead: Substantially reduce serialization and deserialization > overhead when compared with row-at-a-time interface > - UDF performance: Enable users to leverage native libraries in Python (e.g. > numpy, Pandas) for data manipulation in these UDFs > > *Non-Goals* > The following are explicitly out of scope for the current SPIP, and should be > done in future SPIPs. Nonetheless, it would be good to consider these future > use cases during API design, so we can achieve some consistency when rolling > out new APIs. > > - Define block oriented UDFs in other languages (that are not Python). > - Define aggregate UDFs > - Tight integration with machine learning frameworks > > *Proposed API Changes* > The following sketches some possibilities. I haven’t spent a lot of time > thinking about the API (wrote it down in 5 mins) and I am not attached to > this design at all. The main purpose of the SPIP is to get feedback on use > cases and see how they can impact API design. > > A few things to consider are: > > 1. Python is dynamically typed, whereas DataFrames/SQL requires static, > analysis time typing. This means users would need to specify the return type > of their UDFs. > > 2. Ratio of input rows to output rows. We propose initially we require number > of output rows to be the same as the number of input rows. In the future, we > can consider relaxing this constraint with support for vectorized aggregate > UDFs. > 3. How do we handle null values, since Pandas doesn't have the concept of > nulls? > > Proposed API sketch (using examples): > > Use case 1. A function that defines all the columns of a DataFrame (similar > to a “map” function): > > {code} > @spark_udf(some way to describe the return schema) > def my_func_on_entire_df(input): > """ Some user-defined function. > > :param input: A Pandas DataFrame with two columns, a and b. > :return: :class: A Pandas data frame. > """ > input[c] = input[a] + input[b] > Input[d] = input[a] - input[b] > return input > > spark.range(1000).selectExpr("id a", "id / 2 b") > .mapBatches(my_func_on_entire_df) > {code} > > Use case 2. A function that defines only one column (similar to existing > UDFs): > > {code} > @spark_udf(some way to describe the return schema) > def my_func_that_returns_one_column(input): > """ Some user-defined function. > > :param input: A Pandas DataFrame with two columns, a and b. > :return: :class: A numpy array > """ > return input[a] + input[b] > > my_func = udf(my_func_that_returns_one_column) > > df = spark.range(1000).selectExpr("id a", "id / 2 b") > df.withColumn("c", my_func(df.a, df.b)) > {code} > > > > *Optional Design Sketch* > I’m more concerned about getting proper feedback for API design. The > implementation should be pretty straightforward and is not a huge concern at > this point. We can leverage the same implementation for faster toPandas > (using Arrow). > > > *Optional Rejected Designs* > See above. > > > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org