dtenedor commented on code in PR #42272: URL: https://github.com/apache/spark/pull/42272#discussion_r1303379855
########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` Review Comment: ```suggestion Unlike scalar functions that return a single result value from each call, each UDTF is invoked in the ``FROM`` ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: Review Comment: ```suggestion To implement a Python UDTF, you can define a class implementing the following methods: ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. Review Comment: ```suggestion Spark 3.5 introduces the Python user-defined table function (UDTF), a new type of user-defined function. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. Review Comment: ```suggestion the columns in the order they appear in the provided input relation, and with the names computed by the query analyzer. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. Review Comment: ```suggestion `*args` list. Or, if the `eval` method accepts `**kwargs`, these provide the corresponding argument names if the UDTF call passed SQL named arguments. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result relation. + - UDTFs currently do not accept keyword arguments during the function call. + + Examples + -------- + >>> def eval(self, x: int, y: int) -> Iterator[Any]: Review Comment: This example does not use `*args` but accepts the exact arguments as a list instead :) can we mention this is possible in the above description as well, and add an example with `*args*` and `**kwargs` as well, since all three ways are supported? ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result relation. + - UDTFs currently do not accept keyword arguments during the function call. + + Examples + -------- + >>> def eval(self, x: int, y: int) -> Iterator[Any]: + >>> yield x + y, x - y + >>> yield y + x, y - x + """ + ... + + def terminate(self) -> Iterator[Any]: + """ + Called when the UDTF has processed all input rows. + + This method is optional to implement and is useful for performing any + cleanup or finalization operations after the UDTF has finished processing + all rows. It can also be used to yield additional rows if needed. Review Comment: ```suggestion all rows. It can also be used to yield additional rows if needed. Table functions that consume all rows in the entire input partition and then compute and return the entire output table can do so from this method as well (please be mindful of memory usage when doing this). ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). Review Comment: ```suggestion Initializes the user-defined table function (UDTF). This is optional. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. Review Comment: ```suggestion Evaluates the function using the given input arguments. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. Review Comment: ```suggestion calls to the `eval` and `terminate` methods. This class instance will remain alive until all rows in the current partition have been consumed by the `eval` method. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. Review Comment: ```suggestion - This method does not accept any extra arguments. Only the default constructor is supported. ``` ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result relation. + - UDTFs currently do not accept keyword arguments during the function call. + + Examples + -------- + >>> def eval(self, x: int, y: int) -> Iterator[Any]: + >>> yield x + y, x - y + >>> yield y + x, y - x + """ + ... + + def terminate(self) -> Iterator[Any]: + """ + Called when the UDTF has processed all input rows. + + This method is optional to implement and is useful for performing any + cleanup or finalization operations after the UDTF has finished processing + all rows. It can also be used to yield additional rows if needed. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield this if you want to return additional rows during termination. + + Examples + -------- + >>> def terminate(self) -> Iterator[Any]: + >>> yield "done", None + """ + ... + + +The return type of the UDTF defines the schema of the table it outputs. +It must be either a ``StructType`` or a DDL string representing a struct type. + +**Example UDTF Implementation:** + +Here is a simple example of a UDTF implementation: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 36-40 + :dedent: 4 + + +**Instantiating the UDTF:** + +To make use of the UDTF, you'll first need to instantiate it using the :func:`udtf` function: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 42-55 + :dedent: 4 + + +**Using `udtf` as a Decorator:** + +An alternative way for implementing a UDTF is to use :func:`udtf` as a decorator: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 60-77 + :dedent: 4 + +For more detailed usage, please see :func:`udtf`. + + +Registering and Using Python UDTFs in SQL +----------------------------------------- + +Python UDTFs can also be registered and used in SQL queries. + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 82-116 + :dedent: 4 + + +Arrow Optimization +------------------ +Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer +data between JVM and Python processes. Apache Arrow is disabled by default for Python UDTFs. + +To enable Arrow optimization, set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``true``. +Alternatively, you can specify the ``useArrow`` parameter when declaring the UDTF: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 121-126 + :dedent: 4 + + +For more details, please see `Apache Arrow in PySpark <../arrow_pandas.rst>`_. + + +More Examples +------------- + +A Python UDTF that expands date ranges into individual dates: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 131-152 + :dedent: 4 + + +A Python UDTF with ``__init__`` and ``terminate``: + +.. literalinclude:: ../../../../../examples/src/main/python/sql/udtf.py + :language: python + :lines: 157-176 + :dedent: 4 + + +Advanced Featuress +------------------ + +TABLE input argument Review Comment: I would recommend to propose this as the primary way of passing relation arguments, rather than in the "additional features" section, since this syntax conforms to the SQL standard. One way is to just swap the LATERAL syntax to this "advanced features" section instead. ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result relation. + - UDTFs currently do not accept keyword arguments during the function call. + + Examples + -------- + >>> def eval(self, x: int, y: int) -> Iterator[Any]: + >>> yield x + y, x - y + >>> yield y + x, y - x + """ + ... + + def terminate(self) -> Iterator[Any]: + """ + Called when the UDTF has processed all input rows. + + This method is optional to implement and is useful for performing any + cleanup or finalization operations after the UDTF has finished processing + all rows. It can also be used to yield additional rows if needed. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield this if you want to return additional rows during termination. + + Examples + -------- + >>> def terminate(self) -> Iterator[Any]: + >>> yield "done", None + """ + ... + + +The return type of the UDTF defines the schema of the table it outputs. +It must be either a ``StructType`` or a DDL string representing a struct type. Review Comment: should we put an example with this DDL string as well? It looks useful :) ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result relation. + - UDTFs currently do not accept keyword arguments during the function call. Review Comment: this is now obsolete since @ueshin implemented this functionality. ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: Review Comment: Should we document the polymorphic `analyze` method here as well? ########## examples/src/main/python/sql/udtf.py: ########## @@ -0,0 +1,230 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A simple example demonstrating Python UDTFs in Spark +Run with: + ./bin/spark-submit examples/src/main/python/sql/udtf.py +""" + +# NOTE that this file is imported in the User Guides in PySpark documentation. +# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx. +from pyspark.sql import SparkSession +from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version + +# Python UDTFs use Arrow by default. +require_minimum_pandas_version() +require_minimum_pyarrow_version() + + +def python_udtf_simple_example(spark: SparkSession) -> None: + + # Define the UDTF class and implement the required `eval` method. + class SquareNumbers: + def eval(self, start: int, end: int): + for num in range(start, end + 1): + yield (num, num * num) + + from pyspark.sql.functions import lit, udtf + + # Create a UDTF using the class definition and the `udtf` function. + square_num = udtf(SquareNumbers, returnType="num: int, squared: int") + + # Invoke the UDTF in PySpark. + square_num(lit(1), lit(3)).show() + # +---+-------+ + # |num|squared| + # +---+-------+ + # | 1| 1| + # | 2| 4| + # | 3| 9| + # +---+-------+ + + +def python_udtf_decorator_example(spark: SparkSession) -> None: + + from pyspark.sql.functions import lit, udtf + + # Define a UDTF using the `udtf` decorator directly on the class. + @udtf(returnType="num: int, squared: int") + class SquareNumbers: + def eval(self, start: int, end: int): + for num in range(start, end + 1): + yield (num, num * num) + + # Invoke the UDTF in PySpark using the SquareNumbers class directly. + SquareNumbers(lit(1), lit(3)).show() + # +---+-------+ + # |num|squared| + # +---+-------+ + # | 1| 1| + # | 2| 4| + # | 3| 9| + # +---+-------+ + + +def python_udtf_registration(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + + @udtf(returnType="word: string") + class WordSplitter: + def eval(self, text: str): + for word in text.split(" "): + yield (word.strip(),) + + # Register the UDTF for use in Spark SQL. + spark.udtf.register("split_words", WordSplitter) + + # Example: Using the UDTF in SQL. + spark.sql("SELECT * FROM split_words('hello world')").show() + # +-----+ + # | word| + # +-----+ + # |hello| + # |world| + # +-----+ + + # Example: Using the UDTF with a lateral join in SQL. + # The lateral join allows us to reference the columns and aliases + # in the previous FROM clause items as inputs to the UDTF. + spark.sql( + "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), " + "LATERAL split_words(text)" + ).show() + # +------------+------+ + # | text| word| + # +------------+------+ + # | Hello World| Hello| + # | Hello World| World| + # |Apache Spark|Apache| + # |Apache Spark| Spark| + # +------------+------+ + + +def python_udtf_arrow_example(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + + @udtf(returnType="c1: int, c2: int", useArrow=True) + class PlusOne: + def eval(self, x: int): + yield x, x + 1 + + +def python_udtf_date_expander_example(spark: SparkSession) -> None: + + from datetime import datetime, timedelta + from pyspark.sql.functions import lit, udtf + + @udtf(returnType="date: string") + class DateExpander: + def eval(self, start_date: str, end_date: str): + current = datetime.strptime(start_date, '%Y-%m-%d') + end = datetime.strptime(end_date, '%Y-%m-%d') + while current <= end: + yield (current.strftime('%Y-%m-%d'),) + current += timedelta(days=1) + + DateExpander(lit("2023-02-25"), lit("2023-03-01")).show() + # +----------+ + # | date| + # +----------+ + # |2023-02-25| + # |2023-02-26| + # |2023-02-27| + # |2023-02-28| + # |2023-03-01| + # +----------+ + + +def python_udtf_terminate_example(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + + @udtf(returnType="cnt: int") + class CountUDTF: + def __init__(self): + self.count = 0 + + def eval(self, x: int): + self.count += 1 + + def terminate(self): + yield self.count, + + spark.udtf.register("count_udtf", CountUDTF) + spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL count_udtf(id)").show() + # +---+---+ + # | id|cnt| + # +---+---+ + # | 9| 10| + # +---+---+ + + +def python_udtf_table_argument(spark: SparkSession) -> None: + + from pyspark.sql.functions import udtf + from pyspark.sql.types import Row + + @udtf(returnType="id: int") + class FilterUDTF: + def eval(self, row: Row): + if row["id"] > 5: + yield row["id"], + + spark.udtf.register("filter_udtf", FilterUDTF) + + spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show() Review Comment: this is good, let's also add an example just passing a table by name directly as well, e.g. `TABLE(t)`? ########## python/docs/source/user_guide/sql/python_udtf.rst: ########## @@ -0,0 +1,216 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +=========================================== +Python User-defined Table Functions (UDTFs) +=========================================== + +Spark 3.5 introduces Python user-defined table functions (UDTFs), a new type of user-defined function. +Unlike scalar functions that return a single result value for every input, UDTFs is invoked in the ``FROM`` +clause of a query and returns an entire relation as output. +Each UDTF call can accept zero or more arguments. +These arguments can either be scalar expressions or table arguments that represent entire input relations. + +Implementing a Python UDTF +-------------------------- + +.. currentmodule:: pyspark.sql.functions + +To implement a Python UDTF, you can define a class implementing the methods: + +.. code-block:: python + + class PythonUDTF: + + def __init__(self) -> None: + """ + Initialize the user-defined table function (UDTF). + + This method serves as the default constructor and is called once when the + UDTF is instantiated on the executor side. + + Any class fields assigned in this method will be available for subsequent + calls to the `eval` and `terminate` methods. + + Notes + ----- + - This method does not accept any extra arguments. + - You cannot create or reference the Spark session within the UDTF. Any + attempt to do so will result in a serialization error. + """ + ... + + def eval(self, *args: Any) -> Iterator[Any]: + """ + Evaluate the function using the given input arguments. + + This method is required and must be implemented. + + Argument Mapping: + - Each provided scalar expression maps to exactly one value in the + `*args` list. + - Each provided table argument maps to a pyspark.sql.Row object containing + the columns in the order they appear in the provided input relation. + + This method is called on every input row, and can produce zero or more + output rows. Each element in the output tuple corresponds to one column + specified in the return type of the UDTF. + + Parameters + ---------- + *args : Any + Arbitrary positional arguments representing the input to the UDTF. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield as many times as needed to produce multiple rows. + + Notes + ----- + - The result of the function must be a tuple representing a single row + in the UDTF result relation. + - UDTFs currently do not accept keyword arguments during the function call. + + Examples + -------- + >>> def eval(self, x: int, y: int) -> Iterator[Any]: + >>> yield x + y, x - y + >>> yield y + x, y - x + """ + ... + + def terminate(self) -> Iterator[Any]: + """ + Called when the UDTF has processed all input rows. + + This method is optional to implement and is useful for performing any + cleanup or finalization operations after the UDTF has finished processing + all rows. It can also be used to yield additional rows if needed. + + Yields + ------ + tuple + A tuple representing a single row in the UDTF result relation. + Yield this if you want to return additional rows during termination. Review Comment: should we mention here the tricky detail that you have to include a trailing comma when yielding a row of just one value (here and above in the `eval` description)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org