This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 66fb2250224f [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of `coalesce/repartition/repartitionByRange` 66fb2250224f is described below commit 66fb2250224ff9ffb71bf2b320ec05d1b33145c2 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Sep 5 11:09:31 2023 +0900 [SPARK-45049][CONNECT][DOCS][TESTS] Refine docstrings of `coalesce/repartition/repartitionByRange` ### What changes were proposed in this pull request? Enable doctests for `coalesce/repartition/repartitionByRange`, by using `explain` instead of `rdd.getNumPartitions` ### Why are the changes needed? test coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? updated doctests ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42770 from zhengruifeng/enable_doctest_partition. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/dataframe.py | 4 -- python/pyspark/sql/dataframe.py | 117 +++++++++++++++++++++++++++----- 2 files changed, 99 insertions(+), 22 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 86a635361858..b22fdc1383cf 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2191,10 +2191,6 @@ def _test() -> None: os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.connect.dataframe.__dict__.copy() - # Spark Connect does not support RDD but the tests depend on them. - del pyspark.sql.connect.dataframe.DataFrame.coalesce.__doc__ - del pyspark.sql.connect.dataframe.DataFrame.repartition.__doc__ - del pyspark.sql.connect.dataframe.DataFrame.repartitionByRange.__doc__ # TODO(SPARK-41625): Support Structured Streaming del pyspark.sql.connect.dataframe.DataFrame.isStreaming.__doc__ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4b8bdd1c2779..3d7bdd7a0b2b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1763,9 +1763,27 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples -------- - >>> df = spark.range(10) - >>> df.coalesce(1).rdd.getNumPartitions() - 1 + >>> from pyspark.sql import functions as sf + >>> spark.range(0, 10, 1, 3).select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + +---------+ + + >>> from pyspark.sql import functions as sf + >>> spark.range(0, 10, 1, 3).coalesce(1).select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + +---------+ """ return DataFrame(self._jdf.coalesce(numPartitions), self.sparkSession) @@ -1809,23 +1827,78 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples -------- - >>> df = spark.createDataFrame( - ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) + >>> from pyspark.sql import functions as sf + >>> df = spark.range(0, 64, 1, 9).withColumn( + ... "name", sf.concat(sf.lit("name_"), sf.col("id").cast("string")) + ... ).withColumn( + ... "age", sf.col("id") - 32 + ... ) + >>> df.select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + | 3| + | 4| + | 5| + | 6| + | 7| + | 8| + +---------+ Repartition the data into 10 partitions. - >>> df.repartition(10).rdd.getNumPartitions() - 10 + >>> df.repartition(10).select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + | 3| + | 4| + | 5| + | 6| + | 7| + | 8| + | 9| + +---------+ Repartition the data into 7 partitions by 'age' column. - >>> df.repartition(7, "age").rdd.getNumPartitions() - 7 + >>> df.repartition(7, "age").select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + | 3| + | 4| + | 5| + | 6| + +---------+ Repartition the data into 7 partitions by 'age' and 'name columns. - >>> df.repartition(3, "name", "age").rdd.getNumPartitions() - 3 + >>> df.repartition(3, "name", "age").select( + ... sf.spark_partition_id().alias("partition") + ... ).distinct().sort("partition").show() + +---------+ + |partition| + +---------+ + | 0| + | 1| + | 2| + +---------+ """ if isinstance(numPartitions, int): if len(cols) == 0: @@ -1893,15 +1966,23 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples -------- - >>> df = spark.createDataFrame( - ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"]) - Repartition the data into 2 partitions by range in 'age' column. - For example, the first partition can have ``(14, "Tom")``, and the second - partition would have ``(16, "Bob")`` and ``(23, "Alice")``. + For example, the first partition can have ``(14, "Tom")`` and ``(16, "Bob")``, + and the second partition would have ``(23, "Alice")``. - >>> df.repartitionByRange(2, "age").rdd.getNumPartitions() - 2 + >>> from pyspark.sql import functions as sf + >>> spark.createDataFrame( + ... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"] + ... ).repartitionByRange(2, "age").select( + ... "age", "name", sf.spark_partition_id() + ... ).show() + +---+-----+--------------------+ + |age| name|SPARK_PARTITION_ID()| + +---+-----+--------------------+ + | 14| Tom| 0| + | 16| Bob| 0| + | 23|Alice| 1| + +---+-----+--------------------+ """ if isinstance(numPartitions, int): if len(cols) == 0: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org