This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 527cce5dfde [SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-contained 527cce5dfde is described below commit 527cce5dfde68b0c58ba4b94c0288756201e3eff Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue Aug 9 12:12:29 2022 +0900 [SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-contained ### What changes were proposed in this pull request? This PR proposes to improve the examples in `pyspark.sql.group` by making each example self-contained with a brief explanation and a bit more realistic example. ### Why are the changes needed? To make the documentation more readable and able to copy and paste directly in PySpark shell. ### Does this PR introduce _any_ user-facing change? Yes, it changes the documentation ### How was this patch tested? Manually ran each doctest. Closes #37437 from HyukjinKwon/SPARK-40006. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/group.py | 322 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 251 insertions(+), 71 deletions(-) diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index bece13684e0..2fbe76aa5ae 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -25,7 +25,6 @@ from pyspark.sql.column import Column, _to_seq from pyspark.sql.session import SparkSession from pyspark.sql.dataframe import DataFrame from pyspark.sql.pandas.group_ops import PandasGroupedOpsMixin -from pyspark.sql.types import StructType, StructField, IntegerType, StringType if TYPE_CHECKING: from pyspark.sql._typing import LiteralType @@ -112,20 +111,53 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - >>> gdf = df.groupBy(df.name) - >>> sorted(gdf.agg({"*": "count"}).collect()) - [Row(name='Alice', count(1)=1), Row(name='Bob', count(1)=1)] - >>> from pyspark.sql import functions as F - >>> sorted(gdf.agg(F.min(df.age)).collect()) - [Row(name='Alice', min(age)=2), Row(name='Bob', min(age)=5)] - >>> from pyspark.sql.functions import pandas_udf, PandasUDFType + >>> df = spark.createDataFrame( + ... [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]) + >>> df.show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + | 3|Alice| + | 5| Bob| + | 10| Bob| + +---+-----+ + + Group-by name, and count each group. + + >>> df.groupBy(df.name).agg({"*": "count"}).sort("name").show() + +-----+--------+ + | name|count(1)| + +-----+--------+ + |Alice| 2| + | Bob| 2| + +-----+--------+ + + Group-by name, and calculate the minimum age. + + >>> df.groupBy(df.name).agg(F.min(df.age)).sort("name").show() + +-----+--------+ + | name|min(age)| + +-----+--------+ + |Alice| 2| + | Bob| 5| + +-----+--------+ + + Same as above but uses pandas UDF. + >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def min_udf(v): ... return v.min() - >>> sorted(gdf.agg(min_udf(df.age)).collect()) # doctest: +SKIP - [Row(name='Alice', min_udf(age)=2), Row(name='Bob', min_udf(age)=5)] + ... + >>> df.groupBy(df.name).agg(min_udf(df.age)).sort("name").show() # doctest: +SKIP + +-----+------------+ + | name|min_udf(age)| + +-----+------------+ + |Alice| 2| + | Bob| 5| + +-----+------------+ """ assert exprs, "exprs should not be empty" if len(exprs) == 1 and isinstance(exprs[0], dict): @@ -145,8 +177,27 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - >>> sorted(df.groupBy(df.age).count().collect()) - [Row(age=2, count=1), Row(age=5, count=1)] + >>> df = spark.createDataFrame( + ... [(2, "Alice"), (3, "Alice"), (5, "Bob"), (10, "Bob")], ["age", "name"]) + >>> df.show() + +---+-----+ + |age| name| + +---+-----+ + | 2|Alice| + | 3|Alice| + | 5| Bob| + | 10| Bob| + +---+-----+ + + Group-by name, and count each group. + + >>> df.groupBy(df.name).count().sort("name").show() + +-----+-----+ + | name|count| + +-----+-----+ + |Alice| 2| + | Bob| 2| + +-----+-----+ """ @df_varargs_api @@ -161,13 +212,6 @@ class GroupedData(PandasGroupedOpsMixin): ---------- cols : str column names. Non-numeric columns are ignored. - - Examples - -------- - >>> df.groupBy().mean('age').collect() - [Row(avg(age)=3.5)] - >>> df3.groupBy().mean('age', 'height').collect() - [Row(avg(age)=3.5, avg(height)=82.5)] """ @df_varargs_api @@ -185,10 +229,37 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - >>> df.groupBy().avg('age').collect() - [Row(avg(age)=3.5)] - >>> df3.groupBy().avg('age', 'height').collect() - [Row(avg(age)=3.5, avg(height)=82.5)] + >>> df = spark.createDataFrame([ + ... (2, "Alice", 80), (3, "Alice", 100), + ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"]) + >>> df.show() + +---+-----+------+ + |age| name|height| + +---+-----+------+ + | 2|Alice| 80| + | 3|Alice| 100| + | 5| Bob| 120| + | 10| Bob| 140| + +---+-----+------+ + + Group-by name, and calculate the mean of the age in each group. + + >>> df.groupBy("name").avg('age').sort("name").show() + +-----+--------+ + | name|avg(age)| + +-----+--------+ + |Alice| 2.5| + | Bob| 7.5| + +-----+--------+ + + Calculate the mean of the age and height in all data. + + >>> df.groupBy().avg('age', 'height').show() + +--------+-----------+ + |avg(age)|avg(height)| + +--------+-----------+ + | 5.0| 110.0| + +--------+-----------+ """ @df_varargs_api @@ -199,10 +270,37 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - >>> df.groupBy().max('age').collect() - [Row(max(age)=5)] - >>> df3.groupBy().max('age', 'height').collect() - [Row(max(age)=5, max(height)=85)] + >>> df = spark.createDataFrame([ + ... (2, "Alice", 80), (3, "Alice", 100), + ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"]) + >>> df.show() + +---+-----+------+ + |age| name|height| + +---+-----+------+ + | 2|Alice| 80| + | 3|Alice| 100| + | 5| Bob| 120| + | 10| Bob| 140| + +---+-----+------+ + + Group-by name, and calculate the max of the age in each group. + + >>> df.groupBy("name").max("age").sort("name").show() + +-----+--------+ + | name|max(age)| + +-----+--------+ + |Alice| 3| + | Bob| 10| + +-----+--------+ + + Calculate the max of the age and height in all data. + + >>> df.groupBy().max("age", "height").show() + +--------+-----------+ + |max(age)|max(height)| + +--------+-----------+ + | 10| 140| + +--------+-----------+ """ @df_varargs_api @@ -218,10 +316,37 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - >>> df.groupBy().min('age').collect() - [Row(min(age)=2)] - >>> df3.groupBy().min('age', 'height').collect() - [Row(min(age)=2, min(height)=80)] + >>> df = spark.createDataFrame([ + ... (2, "Alice", 80), (3, "Alice", 100), + ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"]) + >>> df.show() + +---+-----+------+ + |age| name|height| + +---+-----+------+ + | 2|Alice| 80| + | 3|Alice| 100| + | 5| Bob| 120| + | 10| Bob| 140| + +---+-----+------+ + + Group-by name, and calculate the min of the age in each group. + + >>> df.groupBy("name").min("age").sort("name").show() + +-----+--------+ + | name|min(age)| + +-----+--------+ + |Alice| 2| + | Bob| 5| + +-----+--------+ + + Calculate the min of the age and height in all data. + + >>> df.groupBy().min("age", "height").show() + +--------+-----------+ + |min(age)|min(height)| + +--------+-----------+ + | 2| 80| + +--------+-----------+ """ @df_varargs_api @@ -237,10 +362,37 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - >>> df.groupBy().sum('age').collect() - [Row(sum(age)=7)] - >>> df3.groupBy().sum('age', 'height').collect() - [Row(sum(age)=7, sum(height)=165)] + >>> df = spark.createDataFrame([ + ... (2, "Alice", 80), (3, "Alice", 100), + ... (5, "Bob", 120), (10, "Bob", 140)], ["age", "name", "height"]) + >>> df.show() + +---+-----+------+ + |age| name|height| + +---+-----+------+ + | 2|Alice| 80| + | 3|Alice| 100| + | 5| Bob| 120| + | 10| Bob| 140| + +---+-----+------+ + + Group-by name, and calculate the sum of the age in each group. + + >>> df.groupBy("name").sum("age").sort("name").show() + +-----+--------+ + | name|sum(age)| + +-----+--------+ + |Alice| 5| + | Bob| 15| + +-----+--------+ + + Calculate the sum of the age and height in all data. + + >>> df.groupBy().sum("age", "height").show() + +--------+-----------+ + |sum(age)|sum(height)| + +--------+-----------+ + | 20| 440| + +--------+-----------+ """ def pivot(self, pivot_col: str, values: Optional[List["LiteralType"]] = None) -> "GroupedData": @@ -261,17 +413,69 @@ class GroupedData(PandasGroupedOpsMixin): Examples -------- - # Compute the sum of earnings for each year by course with each course as a separate column - - >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() - [Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)] - - # Or without specifying column values (less efficient) - - >>> df4.groupBy("year").pivot("course").sum("earnings").collect() - [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)] - >>> df5.groupBy("sales.year").pivot("sales.course").sum("sales.earnings").collect() - [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)] + >>> from pyspark.sql import Row + >>> spark = SparkSession.builder.master("local[4]").appName("sql.group tests").getOrCreate() + >>> df1 = spark.createDataFrame([ + ... Row(course="dotNET", year=2012, earnings=10000), + ... Row(course="Java", year=2012, earnings=20000), + ... Row(course="dotNET", year=2012, earnings=5000), + ... Row(course="dotNET", year=2013, earnings=48000), + ... Row(course="Java", year=2013, earnings=30000), + ... ]) + >>> df1.show() + +------+----+--------+ + |course|year|earnings| + +------+----+--------+ + |dotNET|2012| 10000| + | Java|2012| 20000| + |dotNET|2012| 5000| + |dotNET|2013| 48000| + | Java|2013| 30000| + +------+----+--------+ + >>> df2 = spark.createDataFrame([ + ... Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=10000)), + ... Row(training="junior", sales=Row(course="Java", year=2012, earnings=20000)), + ... Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=5000)), + ... Row(training="junior", sales=Row(course="dotNET", year=2013, earnings=48000)), + ... Row(training="expert", sales=Row(course="Java", year=2013, earnings=30000)), + ... ]) + >>> df2.show() + +--------+--------------------+ + |training| sales| + +--------+--------------------+ + | expert|{dotNET, 2012, 10...| + | junior| {Java, 2012, 20000}| + | expert|{dotNET, 2012, 5000}| + | junior|{dotNET, 2013, 48...| + | expert| {Java, 2013, 30000}| + +--------+--------------------+ + + Compute the sum of earnings for each year by course with each course as a separate column + + >>> df1.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").show() + +----+------+-----+ + |year|dotNET| Java| + +----+------+-----+ + |2012| 15000|20000| + |2013| 48000|30000| + +----+------+-----+ + + Or without specifying column values (less efficient) + + >>> df1.groupBy("year").pivot("course").sum("earnings").show() + +----+-----+------+ + |year| Java|dotNET| + +----+-----+------+ + |2012|20000| 15000| + |2013|30000| 48000| + +----+-----+------+ + >>> df2.groupBy("sales.year").pivot("sales.course").sum("sales.earnings").show() + +----+-----+------+ + |year| Java|dotNET| + +----+-----+------+ + |2012|20000| 15000| + |2013|30000| 48000| + +----+-----+------+ """ if values is None: jgd = self._jgd.pivot(pivot_col) @@ -282,7 +486,7 @@ class GroupedData(PandasGroupedOpsMixin): def _test() -> None: import doctest - from pyspark.sql import Row, SparkSession + from pyspark.sql import SparkSession import pyspark.sql.group globs = pyspark.sql.group.__dict__.copy() @@ -290,30 +494,6 @@ def _test() -> None: sc = spark.sparkContext globs["sc"] = sc globs["spark"] = spark - globs["df"] = sc.parallelize([(2, "Alice"), (5, "Bob")]).toDF( - StructType([StructField("age", IntegerType()), StructField("name", StringType())]) - ) - globs["df3"] = sc.parallelize( - [Row(name="Alice", age=2, height=80), Row(name="Bob", age=5, height=85)] - ).toDF() - globs["df4"] = sc.parallelize( - [ - Row(course="dotNET", year=2012, earnings=10000), - Row(course="Java", year=2012, earnings=20000), - Row(course="dotNET", year=2012, earnings=5000), - Row(course="dotNET", year=2013, earnings=48000), - Row(course="Java", year=2013, earnings=30000), - ] - ).toDF() - globs["df5"] = sc.parallelize( - [ - Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=10000)), - Row(training="junior", sales=Row(course="Java", year=2012, earnings=20000)), - Row(training="expert", sales=Row(course="dotNET", year=2012, earnings=5000)), - Row(training="junior", sales=Row(course="dotNET", year=2013, earnings=48000)), - Row(training="expert", sales=Row(course="Java", year=2013, earnings=30000)), - ] - ).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.group, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org