Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19505#discussion_r145552397 --- Diff: python/pyspark/sql/functions.py --- @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()): :param f: user-defined function. A python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object - The user-defined function can define one of the following transformations: - - 1. One or more `pandas.Series` -> A `pandas.Series` - - This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and - :meth:`pyspark.sql.DataFrame.select`. - The returnType should be a primitive data type, e.g., `DoubleType()`. - The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. - - >>> from pyspark.sql.types import IntegerType, StringType - >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) - >>> @pandas_udf(returnType=StringType()) - ... def to_upper(s): - ... return s.str.upper() - ... - >>> @pandas_udf(returnType="integer") - ... def add_one(x): - ... return x + 1 - ... - >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) - >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ - ... .show() # doctest: +SKIP - +----------+--------------+------------+ - |slen(name)|to_upper(name)|add_one(age)| - +----------+--------------+------------+ - | 8| JOHN DOE| 22| - +----------+--------------+------------+ - - 2. A `pandas.DataFrame` -> A `pandas.DataFrame` - - This udf is only used with :meth:`pyspark.sql.GroupedData.apply`. - The returnType should be a :class:`StructType` describing the schema of the returned - `pandas.DataFrame`. - - >>> df = spark.createDataFrame( - ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], - ... ("id", "v")) - >>> @pandas_udf(returnType=df.schema) - ... def normalize(pdf): - ... v = pdf.v - ... return pdf.assign(v=(v - v.mean()) / v.std()) - >>> df.groupby('id').apply(normalize).show() # doctest: +SKIP - +---+-------------------+ - | id| v| - +---+-------------------+ - | 1|-0.7071067811865475| - | 1| 0.7071067811865475| - | 2|-0.8320502943378437| - | 2|-0.2773500981126146| - | 2| 1.1094003924504583| - +---+-------------------+ - - .. note:: This type of udf cannot be used with functions such as `withColumn` or `select` - because it defines a `DataFrame` transformation rather than a `Column` - transformation. - - .. seealso:: :meth:`pyspark.sql.GroupedData.apply` + The user-defined function can define the following transformation: + + One or more `pandas.Series` -> A `pandas.Series` + + This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + The returnType should be a primitive data type, e.g., `DoubleType()`. + The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. + + >>> from pyspark.sql.types import IntegerType, StringType + >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) + >>> @pandas_udf(returnType=StringType()) + ... def to_upper(s): + ... return s.str.upper() + ... + >>> @pandas_udf(returnType="integer") + ... def add_one(x): + ... return x + 1 + ... + >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) + >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ + ... .show() # doctest: +SKIP + +----------+--------------+------------+ + |slen(name)|to_upper(name)|add_one(age)| + +----------+--------------+------------+ + | 8| JOHN DOE| 22| + +----------+--------------+------------+ + + .. note:: The user-defined function must be deterministic. + """ + return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF) + + +@since(2.3) +def pandas_grouped_udf(f=None, returnType=StructType()): --- End diff -- Here is a summary of the current proposal: I. Use only `pandas_udf` -------------------------- The main issues with this approach as a few people comment out is that it is hard to know what the udf does without look at the implementation. For instance, for a udf: ``` @pandas_udf(DoubleType()) def foo(v): ... ``` It's hard to tell whether this function is a reduction that returns a scalar double, or a transform function that returns a pd.Series of double. This is less than ideal because: * The user of the udf cannot tell which functions this udf can be used with. i.e, can this be used with `groupby().apply()` or `withColumn` or `groupby().agg()`? * Catalyst cannot do validation at planning phase, i.e., it cannot throw exception if user passes a transformation function rather than aggregation function to `groupby().agg()` II. Use different decorators. i,e, `pandas_udf` (or `pandas_scalar_udf`), `pandas_grouped_udf`, `pandas_udaf` ---------------------------------------------------------------------------------------------------------------- The idea of this approach is to use `pandas_grouped_udf` for all group udfs, and `pandas_scalar_udf` for scalar pandas udfs that gets used with "withColumn". This helps with distinguish between some scalar udf and group udfs. However, this approach doesn't help to distinguish among group udfs. For instance, the group transform and group aggregation examples above. III. Use `pandas_udf` decorate and a function type enum for "one-step" vectorized udf and `pandas_udaf` for multi-step aggregation function ---------------------------------------------------------- This approach uses a function type enum to describe what the udf does. Here are the proposed function types: * transform A pd.Series(s) -> pd.Series transformation that is independent of the grouping. This is the existing scalar pandas udf. * group_transform A pd.Series(s) -> pd.Series transformation that is dependent of the grouping. e.g. ``` @pandas_udf(DoubleType(), GROUP_TRANSFORM): def foo(v): return (v - v.mean()) / v.std() ``` * group_aggregate: A pd.Series(s) -> scalar function, e.g. ``` @pandas_udf(DoubleType(), GROUP_AGGREGATE): def foo(v): return v.mean() ``` * group_map (maybe a better name): This defines a pd.DataFrame -> pd.DataFrame transformation. This is the current `groupby().apply()` udf These types also works with window functions because window functions are either (1) group_transform (rank) or (2) group_aggregate (first, last) I am in favor of (3). What do you guys think?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org