Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145552397
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a 
standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following 
transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., 
`DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as 
the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the 
schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as 
`withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather 
than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the 
input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, 
pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Here is a summary of the current proposal:
    
    I. Use only `pandas_udf`
    --------------------------
    The main issues with this approach as a few people comment out is that it 
is hard to know what the udf does without look at the implementation.
    For instance, for a udf:
    ```
    @pandas_udf(DoubleType())
    def foo(v):
          ...
    ``` 
    It's hard to tell whether this function is a reduction that returns a 
scalar double, or a transform function that returns a pd.Series of double.
    
    This is less than ideal because:
    * The user of the udf cannot tell which functions this udf can be used 
with. i.e, can this be used with `groupby().apply()` or `withColumn` or 
`groupby().agg()`?
    * Catalyst cannot do validation at planning phase, i.e., it cannot throw 
exception if user passes a transformation function rather than aggregation 
function to `groupby().agg()`
    
    II. Use different decorators. i,e, `pandas_udf` (or `pandas_scalar_udf`), 
`pandas_grouped_udf`, `pandas_udaf`
    
----------------------------------------------------------------------------------------------------------------
    The idea of this approach is to use `pandas_grouped_udf` for all group 
udfs, and `pandas_scalar_udf` for scalar pandas udfs that gets used with 
"withColumn". This helps with distinguish between some scalar udf and group 
udfs. However, this approach doesn't help to distinguish among group udfs. For 
instance, the group transform and group aggregation examples above.
     
    III. Use `pandas_udf` decorate and a function type enum for "one-step" 
vectorized udf and `pandas_udaf` for multi-step aggregation function
    ----------------------------------------------------------
    This approach uses a function type enum to describe what the udf does. Here 
are the proposed function types:
    * transform
    A pd.Series(s) -> pd.Series transformation that is independent of the 
grouping. This is the existing scalar pandas udf.
    * group_transform
    A pd.Series(s) -> pd.Series transformation that is dependent of the 
grouping. e.g.
    ```
    @pandas_udf(DoubleType(), GROUP_TRANSFORM):
    def foo(v):
          return (v - v.mean()) / v.std()
    ```
    * group_aggregate:
    A pd.Series(s) -> scalar function, e.g.
    ```
    @pandas_udf(DoubleType(), GROUP_AGGREGATE):
    def foo(v):
          return v.mean()
    ```
    * group_map (maybe a better name):
    This defines a pd.DataFrame -> pd.DataFrame transformation. This is the 
current `groupby().apply()` udf
    
    These types also works with window functions because window functions are 
either (1) group_transform (rank) or (2) group_aggregate (first, last)
    
    I am in favor of (3). What do you guys think?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to