This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8299600  [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for 
SCALAR_ITER Pandas UDF
8299600 is described below

commit 8299600575024ca81127b7bf8ef48ae11fdd0594
Author: Xiangrui Meng <m...@databricks.com>
AuthorDate: Fri Jun 28 15:09:57 2019 -0700

    [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas 
UDF
    
    ## What changes were proposed in this pull request?
    
    Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned 
that per-partition execution is an implementation detail, not guaranteed. I 
will submit another PR to add the same to user guide, just to keep this PR 
minimal.
    
    I didn't add "doctest: +SKIP" in the first commit so it is easy to test 
locally.
    
    cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123
    
    ![Screen Shot 2019-06-28 at 9 52 41 
AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png)
    ![Screen Shot 2019-06-28 at 9 53 19 
AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png)
    
    ## How was this patch tested?
    
    doctest
    
    Closes #25005 from mengxr/SPARK-28056.2.
    
    Authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 python/pyspark/sql/functions.py | 104 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 102 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 34f6593..5d1e69e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2951,7 +2951,107 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
            Therefore, this can be used, for example, to ensure the length of 
each returned
            `pandas.Series`, and can not be used as the column length.
 
-    2. GROUPED_MAP
+    2. SCALAR_ITER
+
+       A scalar iterator UDF is semantically the same as the scalar Pandas UDF 
above except that the
+       wrapped Python function takes an iterator of batches as input instead 
of a single batch and,
+       instead of returning a single output batch, it yields output batches or 
explicitly returns an
+       generator or an iterator of output batches.
+       It is useful when the UDF execution requires initializing some state, 
e.g., loading a machine
+       learning model file to apply inference to every input batch.
+
+       .. note:: It is not guaranteed that one invocation of a scalar iterator 
UDF will process all
+           batches from one partition, although it is currently implemented 
this way.
+           Your code shall not rely on this behavior because it might change 
in the future for
+           further optimization, e.g., one invocation processes multiple 
partitions.
+
+       Scalar iterator UDFs are used with 
:meth:`pyspark.sql.DataFrame.withColumn` and
+       :meth:`pyspark.sql.DataFrame.select`.
+
+       >>> import pandas as pd  # doctest: +SKIP
+       >>> from pyspark.sql.functions import col, pandas_udf, struct, 
PandasUDFType
+       >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"])  # doctest: +SKIP
+       >>> df = spark.createDataFrame(pdf)  # doctest: +SKIP
+
+       When the UDF is called with a single column that is not `StructType`, 
the input to the
+       underlying function is an iterator of `pd.Series`.
+
+       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+       ... def plus_one(batch_iter):
+       ...     for x in batch_iter:
+       ...         yield x + 1
+       ...
+       >>> df.select(plus_one(col("x"))).show()  # doctest: +SKIP
+       +-----------+
+       |plus_one(x)|
+       +-----------+
+       |          2|
+       |          3|
+       |          4|
+       +-----------+
+
+       When the UDF is called with more than one columns, the input to the 
underlying function is an
+       iterator of `pd.Series` tuple.
+
+       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+       ... def multiply_two_cols(batch_iter):
+       ...     for a, b in batch_iter:
+       ...         yield a * b
+       ...
+       >>> df.select(multiply_two_cols(col("x"), col("x"))).show()  # doctest: 
+SKIP
+       +-----------------------+
+       |multiply_two_cols(x, x)|
+       +-----------------------+
+       |                      1|
+       |                      4|
+       |                      9|
+       +-----------------------+
+
+       When the UDF is called with a single column that is `StructType`, the 
input to the underlying
+       function is an iterator of `pd.DataFrame`.
+
+       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+       ... def multiply_two_nested_cols(pdf_iter):
+       ...    for pdf in pdf_iter:
+       ...        yield pdf["a"] * pdf["b"]
+       ...
+       >>> df.select(
+       ...     multiply_two_nested_cols(
+       ...         struct(col("x").alias("a"), col("x").alias("b"))
+       ...     ).alias("y")
+       ... ).show()  # doctest: +SKIP
+       +---+
+       |  y|
+       +---+
+       |  1|
+       |  4|
+       |  9|
+       +---+
+
+       In the UDF, you can initialize some states before processing batches, 
wrap your code with
+       `try ... finally ...` or use context managers to ensure the release of 
resources at the end
+       or in case of early termination.
+
+       >>> y_bc = spark.sparkContext.broadcast(1)  # doctest: +SKIP
+       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
+       ... def plus_y(batch_iter):
+       ...     y = y_bc.value  # initialize some state
+       ...     try:
+       ...         for x in batch_iter:
+       ...             yield x + y
+       ...     finally:
+       ...         pass  # release resources here, if any
+       ...
+       >>> df.select(plus_y(col("x"))).show()  # doctest: +SKIP
+       +---------+
+       |plus_y(x)|
+       +---------+
+       |        2|
+       |        3|
+       |        4|
+       +---------+
+
+    3. GROUPED_MAP
 
        A grouped map UDF defines transformation: A `pandas.DataFrame` -> A 
`pandas.DataFrame`
        The returnType should be a :class:`StructType` describing the schema of 
the returned
@@ -3030,7 +3130,7 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
 
        .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
 
-    3. GROUPED_AGG
+    4. GROUPED_AGG
 
        A grouped aggregate UDF defines a transformation: One or more 
`pandas.Series` -> A scalar
        The `returnType` should be a primitive data type, e.g., 
:class:`DoubleType`.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to