This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aaf413ce351 [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions aaf413ce351 is described below commit aaf413ce351dd716096333df140f45f7f1bd5dd6 Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Fri Sep 8 14:25:51 2023 +0900 [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions ### What changes were proposed in this pull request? This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5. <img width="468" alt="Screenshot 2023-08-04 at 14 46 13" src="https://github.com/apache/spark/assets/66282705/11f5dc5e-681b-4677-a466-1a23c0b8dd01"> ### Why are the changes needed? To help users write Python UDTFs. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? docs test Closes #42272 from allisonwang-db/spark-44508-udtf-user-guide. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- examples/src/main/python/sql/udtf.py | 240 ++++++++++++++++++++++ python/docs/source/user_guide/sql/index.rst | 1 + python/docs/source/user_guide/sql/python_udtf.rst | 233 +++++++++++++++++++++ python/mypy.ini | 6 + python/pyspark/sql/functions.py | 7 - 5 files changed, 480 insertions(+), 7 deletions(-) diff --git a/examples/src/main/python/sql/udtf.py b/examples/src/main/python/sql/udtf.py new file mode 100644 index 00000000000..768eb73566e --- /dev/null +++ b/examples/src/main/python/sql/udtf.py @@ -0,0 +1,240 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A simple example demonstrating Python UDTFs in Spark +Run with: + ./bin/spark-submit examples/src/main/python/sql/udtf.py +""" + +# NOTE that this file is imported in the User Guides in PySpark documentation. +# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx. +from pyspark.sql import SparkSession +from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version + +# Python UDTFs use Arrow by default. +require_minimum_pandas_version() +require_minimum_pyarrow_version() + + +def python_udtf_simple_example(spark: SparkSession) -> None: + + # Define the UDTF class and implement the required `eval` method. + class SquareNumbers: + def eval(self, start: int, end: int): + for num in range(start, end + 1): + yield (num, num * num) + + from pyspark.sql.functions import lit, udtf + + # Create a UDTF using the class definition and the `udtf` function. + square_num = udtf(SquareNumbers, returnType="num: int, squared: int") + + # Invoke the UDTF in PySpark. + square_num(lit(1), lit(3)).show() + # +---+-------+ + # |num|squared| + # +---+-------+ + # | 1| 1| + # | 2| 4| + # | 3| 9| + # +---+-------+ + + +def python_udtf_decorator_example(spark: SparkSession) -> None: + + from pyspark.sql.functions import lit, udtf + + # Define a UDTF using the `udtf` decorator directly on the class. + @udtf(returnType="num: int, squared: int") + class SquareNumbers: + def eval(self, start: int, end: int): + for num in range(start, end + 1): + yield (num, num * num) + + # Invoke the UDTF in PySpark using the SquareNumbers class directly. + SquareNumbers(lit(1), lit(3)).show() + # +---+-------+ + # |num|squared| + # +---+-------+ + # | 1| 1| + # | 2| 4| + # | 3| 9| + # +---+-------+ + + +def python_udtf_registration(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + + @udtf(returnType="word: string") + class WordSplitter: + def eval(self, text: str): + for word in text.split(" "): + yield (word.strip(),) + + # Register the UDTF for use in Spark SQL. + spark.udtf.register("split_words", WordSplitter) + + # Example: Using the UDTF in SQL. + spark.sql("SELECT * FROM split_words('hello world')").show() + # +-----+ + # | word| + # +-----+ + # |hello| + # |world| + # +-----+ + + # Example: Using the UDTF with a lateral join in SQL. + # The lateral join allows us to reference the columns and aliases + # in the previous FROM clause items as inputs to the UDTF. + spark.sql( + "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), " + "LATERAL split_words(text)" + ).show() + # +------------+------+ + # | text| word| + # +------------+------+ + # | Hello World| Hello| + # | Hello World| World| + # |Apache Spark|Apache| + # |Apache Spark| Spark| + # +------------+------+ + + +def python_udtf_arrow_example(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + + @udtf(returnType="c1: int, c2: int", useArrow=True) + class PlusOne: + def eval(self, x: int): + yield x, x + 1 + + +def python_udtf_date_expander_example(spark: SparkSession) -> None: + + from datetime import datetime, timedelta + from pyspark.sql.functions import lit, udtf + + @udtf(returnType="date: string") + class DateExpander: + def eval(self, start_date: str, end_date: str): + current = datetime.strptime(start_date, '%Y-%m-%d') + end = datetime.strptime(end_date, '%Y-%m-%d') + while current <= end: + yield (current.strftime('%Y-%m-%d'),) + current += timedelta(days=1) + + DateExpander(lit("2023-02-25"), lit("2023-03-01")).show() + # +----------+ + # | date| + # +----------+ + # |2023-02-25| + # |2023-02-26| + # |2023-02-27| + # |2023-02-28| + # |2023-03-01| + # +----------+ + + +def python_udtf_terminate_example(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + + @udtf(returnType="cnt: int") + class CountUDTF: + def __init__(self): + # Initialize the counter to 0 when an instance of the class is created. + self.count = 0 + + def eval(self, x: int): + # Increment the counter by 1 for each input value received. + self.count += 1 + + def terminate(self): + # Yield the final count when the UDTF is done processing. + yield self.count, + + spark.udtf.register("count_udtf", CountUDTF) + spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)").show() + # +---+---+ + # | id|cnt| + # +---+---+ + # | 9| 10| + # +---+---+ + spark.sql("SELECT * FROM range(0, 10, 1, 2), LATERAL count_udtf(id)").show() + # +---+---+ + # | id|cnt| + # +---+---+ + # | 4| 5| + # | 9| 5| + # +---+---+ + + +def python_udtf_table_argument(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + from pyspark.sql.types import Row + + @udtf(returnType="id: int") + class FilterUDTF: + def eval(self, row: Row): + if row["id"] > 5: + yield row["id"], + + spark.udtf.register("filter_udtf", FilterUDTF) + + spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show() + # +---+ + # | id| + # +---+ + # | 6| + # | 7| + # | 8| + # | 9| + # +---+ + + +if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("Python UDTF example") \ + .getOrCreate() + + print("Running Python UDTF single example") + python_udtf_simple_example(spark) + + print("Running Python UDTF decorator example") + python_udtf_decorator_example(spark) + + print("Running Python UDTF registration example") + python_udtf_registration(spark) + + print("Running Python UDTF arrow example") + python_udtf_arrow_example(spark) + + print("Running Python UDTF date expander example") + python_udtf_date_expander_example(spark) + + print("Running Python UDTF terminate example") + python_udtf_terminate_example(spark) + + print("Running Python UDTF table argument example") + python_udtf_table_argument(spark) + + spark.stop() diff --git a/python/docs/source/user_guide/sql/index.rst b/python/docs/source/user_guide/sql/index.rst index 4cab99efa35..c0369de6786 100644 --- a/python/docs/source/user_guide/sql/index.rst +++ b/python/docs/source/user_guide/sql/index.rst @@ -24,4 +24,5 @@ Spark SQL :maxdepth: 2 arrow_pandas + python_udtf diff --git a/python/docs/source/user_guide/sql/python_udtf.rst b/python/docs/source/user_guide/sql/python_udtf.rst new file mode 100644 index 00000000000..0e583915c58 --- /dev/null +++ b/python/docs/source/user_guide/sql/python_udtf.rst @@ -0,0 +1,233 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces the Python user-defined table function (UDTF), a new type of user-defined function. +Unlike scalar functions that return a single result value from each call, each UDTF is invoked in +the ``FROM`` clause of a query and returns an entire table as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input tables. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you first need to define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initializes the user-defined table function (UDTF). This is optional. + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. This class instance will remain + alive until all rows in the current partition have been consumed by the `eval` + method. + + Notes + ----- + - This method does not accept any extra arguments. Only the default + constructor is supported. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluates the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input table, + and with the names computed by the query analyzer. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result table. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result table. + - UDTFs currently do not accept keyword arguments during the function call. + + Examples + -------- + eval that returns one row and one column for each input. + + >>> def eval(self, x: int): + ... yield (x, ) + + eval that returns two rows and two columns for each input. + + >>> def eval(self, x: int, y: int): + ... yield (x + y, x - y) + ... yield (y + x, y - x) + """ + ... + + def terminate(self) -> Iterator[Any]: + """ + Called when the UDTF has processed all input rows. + + This method is optional to implement and is useful for performing any + cleanup or finalization operations after the UDTF has finished processing + all rows. It can also be used to yield additional rows if needed. + Table functions that consume all rows in the entire input partition + and then compute and return the entire output table can do so from + this method as well (please be mindful of memory usage when doing + this). + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result table. + Yield this if you want to return additional rows during termination. + + Examples + -------- + >>> def terminate(self) -> Iterator[Any]: + >>> yield "done", None + """ + ... + + +The return type of the UDTF defines the schema of the table it outputs. +It must be either a ``StructType``, for example ``StructType().add("c1", StringType())`` +or a DDL string representing a struct type, for example ``c1: string``. + +**Example of UDTF Class Implementation** + +Here is a simple example of a UDTF class implementation: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 36-40 + :dedent: 4 + + +**Instantiating a UDTF with the ``udtf`` Decorator** + +To make use of the UDTF, you'll first need to instantiate it using the ``@udtf`` decorator: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 42-55 + :dedent: 4 + + +**Instantiating a UDTF with the ``udtf`` Function** + +An alternative way to create a UDTF is to use the :func:`udtf` function: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 60-77 + :dedent: 4 + +For more detailed usage, please see :func:`udtf`. + + +Registering and Using Python UDTFs in SQL +----------------------------------------- + +Python UDTFs can also be registered and used in SQL queries. + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 82-116 + :dedent: 4 + + +Arrow Optimization +------------------ +Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer +data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs. + +Arrow can improve performance when each input row generates a large result table from the UDTF. + +To enable Arrow optimization, set the ``spark.sql.execution.pythonUDTF.arrow.enabled`` +configuration to ``true``. You can also enable it by specifying the ``useArrow`` parameter +when declaring the UDTF. + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 121-126 + :dedent: 4 + + +For more details, please see `Apache Arrow in PySpark <../arrow_pandas.rst>`_. + + +TABLE input argument +~~~~~~~~~~~~~~~~~~~~ +Python UDTFs can also take a TABLE as input argument, and it can be used in conjunction +with scalar input arguments. +By default, you are allowed to have only one TABLE argument as input, primarily for +performance reasons. If you need to have more than one TABLE input argument, +you can enable this by setting the ``spark.sql.tvf.allowMultipleTableArguments.enabled`` +configuration to ``true``. + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 191-210 + :dedent: 4 + + +More Examples +------------- + +A Python UDTF that expands date ranges into individual dates: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 131-152 + :dedent: 4 + + +A Python UDTF with ``__init__`` and ``terminate``: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 157-186 + :dedent: 4 diff --git a/python/mypy.ini b/python/mypy.ini index 4d1fc3ceb66..3443af9a865 100644 --- a/python/mypy.ini +++ b/python/mypy.ini @@ -80,6 +80,12 @@ disallow_untyped_defs = False [mypy-pyspark.worker] disallow_untyped_defs = False +; Allow untyped def and disable certain error codes in examples + +[mypy-python.sql.udtf] +disallow_untyped_defs = False +disable_error_code = attr-defined,arg-type,call-arg,union-attr + ; Ignore errors in tests [mypy-pyspark.ml.tests.*] diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index c1e24ba25ac..3138cafa126 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -16891,13 +16891,6 @@ def udtf( Use "yield" to produce one row for the UDTF result relation as many times as needed. In the context of a lateral join, each such result row will be associated with the most recent input row consumed from the "eval" method. - Or, use "return" to produce multiple rows for the UDTF result relation at - once. - - >>> class TestUDTF: - ... def eval(self, a: int): - ... return [(a, a + 1), (a, a + 2)] - >>> test_udtf = udtf(TestUDTF, returnType="x: int, y: int") User-defined table functions are considered opaque to the optimizer by default. As a result, operations like filters from WHERE clauses or limits from --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org