Repository: spark Updated Branches: refs/heads/master f1550aaf1 -> 8141d5592
[SPARK-23633][SQL] Update Pandas UDFs section in sql-programming-guide ## What changes were proposed in this pull request? Update Pandas UDFs section in sql-programming-guide. Add section for grouped aggregate pandas UDF. ## How was this patch tested? Author: Li Jin <ice.xell...@gmail.com> Closes #21887 from icexelloss/SPARK-23633-sql-programming-guide. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8141d559 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8141d559 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8141d559 Branch: refs/heads/master Commit: 8141d55926e95c06cd66bf82098895e1ed419449 Parents: f1550aa Author: Li Jin <ice.xell...@gmail.com> Authored: Tue Jul 31 10:10:38 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Tue Jul 31 10:10:38 2018 +0800 ---------------------------------------------------------------------- docs/sql-programming-guide.md | 19 +++++++++++++++ examples/src/main/python/sql/arrow.py | 37 ++++++++++++++++++++++++++++++ python/pyspark/sql/functions.py | 5 ++-- 3 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/docs/sql-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index cff521c..5f1eee8 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1811,6 +1811,25 @@ The following example shows how to use `groupby().apply()` to subtract the mean For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and [`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply). +### Grouped Aggregate + +Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and +[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It defines an aggregation from one or more `pandas.Series` +to a scalar value, where each `pandas.Series` represents a column within the group or window. + +Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, +only unbounded window is supported with Grouped aggregate Pandas UDFs currently. + +The following example shows how to use this type of UDF to compute mean with groupBy and window operations: + +<div class="codetabs"> +<div data-lang="python" markdown="1"> +{% include_example grouped_agg_pandas_udf python/sql/arrow.py %} +</div> +</div> + +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) + ## Usage Notes ### Supported SQL Types http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/examples/src/main/python/sql/arrow.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index 4c5aefb..6c4510d 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -113,6 +113,43 @@ def grouped_map_pandas_udf_example(spark): # $example off:grouped_map_pandas_udf$ +def grouped_agg_pandas_udf_example(spark): + # $example on:grouped_agg_pandas_udf$ + from pyspark.sql.functions import pandas_udf, PandasUDFType + from pyspark.sql import Window + + df = spark.createDataFrame( + [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], + ("id", "v")) + + @pandas_udf("double", PandasUDFType.GROUPED_AGG) + def mean_udf(v): + return v.mean() + + df.groupby("id").agg(mean_udf(df['v'])).show() + # +---+-----------+ + # | id|mean_udf(v)| + # +---+-----------+ + # | 1| 1.5| + # | 2| 6.0| + # +---+-----------+ + + w = Window \ + .partitionBy('id') \ + .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() + # +---+----+------+ + # | id| v|mean_v| + # +---+----+------+ + # | 1| 1.0| 1.5| + # | 1| 2.0| 1.5| + # | 2| 3.0| 6.0| + # | 2| 5.0| 6.0| + # | 2|10.0| 6.0| + # +---+----+------+ + # $example off:grouped_agg_pandas_udf$ + + if __name__ == "__main__": spark = SparkSession \ .builder \ http://git-wip-us.apache.org/repos/asf/spark/blob/8141d559/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0a88e48..dd7daf9 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2810,8 +2810,9 @@ def pandas_udf(f=None, returnType=None, functionType=None): >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def mean_udf(v): ... return v.mean() - >>> w = Window.partitionBy('id') \\ - ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + >>> w = Window \\ + ... .partitionBy('id') \\ + ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP +---+----+------+ | id| v|mean_v| --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org