[ https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151376#comment-16151376 ]
Leif Walsh commented on SPARK-21190: ------------------------------------ Yep, that's totally a thing: {noformat}In [1]: import pandas as pd In [2]: pd.DataFrame(index=list(range(100))) Out[2]: Empty DataFrame Columns: [] Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] [100 rows x 0 columns] In [3]: df = Out[2] In [4]: pd.Series(data=1, index=df.index) Out[4]: 0 1 1 1 2 1 3 1 4 1 5 1 6 1 7 1 8 1 9 1 10 1 11 1 12 1 13 1 14 1 15 1 16 1 17 1 18 1 19 1 20 1 21 1 22 1 23 1 24 1 25 1 26 1 27 1 28 1 29 1 .. 70 1 71 1 72 1 73 1 74 1 75 1 76 1 77 1 78 1 79 1 80 1 81 1 82 1 83 1 84 1 85 1 86 1 87 1 88 1 89 1 90 1 91 1 92 1 93 1 94 1 95 1 96 1 97 1 98 1 99 1 Length: 100, dtype: int64 {noformat} So, how about: {noformat} @pandas_udf(LongType()) def f0(df): return pd.Series(data=1, index=df.index) df.select(f0()) {noformat} > SPIP: Vectorized UDFs in Python > ------------------------------- > > Key: SPARK-21190 > URL: https://issues.apache.org/jira/browse/SPARK-21190 > Project: Spark > Issue Type: New Feature > Components: PySpark, SQL > Affects Versions: 2.2.0 > Reporter: Reynold Xin > Assignee: Reynold Xin > Labels: SPIP > Attachments: SPIPVectorizedUDFsforPython (1).pdf > > > *Background and Motivation* > Python is one of the most popular programming languages among Spark users. > Spark currently exposes a row-at-a-time interface for defining and executing > user-defined functions (UDFs). This introduces high overhead in serialization > and deserialization, and also makes it difficult to leverage Python libraries > (e.g. numpy, Pandas) that are written in native code. > > This proposal advocates introducing new APIs to support vectorized UDFs in > Python, in which a block of data is transferred over to Python in some > columnar format for execution. > > > *Target Personas* > Data scientists, data engineers, library developers. > > *Goals* > - Support vectorized UDFs that apply on chunks of the data frame > - Low system overhead: Substantially reduce serialization and deserialization > overhead when compared with row-at-a-time interface > - UDF performance: Enable users to leverage native libraries in Python (e.g. > numpy, Pandas) for data manipulation in these UDFs > > *Non-Goals* > The following are explicitly out of scope for the current SPIP, and should be > done in future SPIPs. Nonetheless, it would be good to consider these future > use cases during API design, so we can achieve some consistency when rolling > out new APIs. > > - Define block oriented UDFs in other languages (that are not Python). > - Define aggregate UDFs > - Tight integration with machine learning frameworks > > *Proposed API Changes* > The following sketches some possibilities. I haven’t spent a lot of time > thinking about the API (wrote it down in 5 mins) and I am not attached to > this design at all. The main purpose of the SPIP is to get feedback on use > cases and see how they can impact API design. > > A few things to consider are: > > 1. Python is dynamically typed, whereas DataFrames/SQL requires static, > analysis time typing. This means users would need to specify the return type > of their UDFs. > > 2. Ratio of input rows to output rows. We propose initially we require number > of output rows to be the same as the number of input rows. In the future, we > can consider relaxing this constraint with support for vectorized aggregate > UDFs. > 3. How do we handle null values, since Pandas doesn't have the concept of > nulls? > > Proposed API sketch (using examples): > > Use case 1. A function that defines all the columns of a DataFrame (similar > to a “map” function): > > {code} > @spark_udf(some way to describe the return schema) > def my_func_on_entire_df(input): > """ Some user-defined function. > > :param input: A Pandas DataFrame with two columns, a and b. > :return: :class: A Pandas data frame. > """ > input[c] = input[a] + input[b] > Input[d] = input[a] - input[b] > return input > > spark.range(1000).selectExpr("id a", "id / 2 b") > .mapBatches(my_func_on_entire_df) > {code} > > Use case 2. A function that defines only one column (similar to existing > UDFs): > > {code} > @spark_udf(some way to describe the return schema) > def my_func_that_returns_one_column(input): > """ Some user-defined function. > > :param input: A Pandas DataFrame with two columns, a and b. > :return: :class: A numpy array > """ > return input[a] + input[b] > > my_func = udf(my_func_that_returns_one_column) > > df = spark.range(1000).selectExpr("id a", "id / 2 b") > df.withColumn("c", my_func(df.a, df.b)) > {code} > > > > *Optional Design Sketch* > I’m more concerned about getting proper feedback for API design. The > implementation should be pretty straightforward and is not a huge concern at > this point. We can leverage the same implementation for faster toPandas > (using Arrow). > > > *Optional Rejected Designs* > See above. > > > > -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org