This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 736be3116c7 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client 736be3116c7 is described below commit 736be3116c7c13c82eac91f426ee6e96753c9cf5 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Wed Nov 9 17:35:43 2022 +0800 [SPARK-41026][CONNECT][PYTHON][FOLLOW-UP] Add Coalesce and Repartition API to Python client ### What changes were proposed in this pull request? Following up https://github.com/apache/spark/pull/38529, with `Reparitition` proto we can support `Coalesce` and `Repartition` API in Python client. ### Why are the changes needed? Improve API coverage ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38549 from amaliujia/support_coalesce_in_python. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/connect/dataframe.py | 49 +++++++++++++++++++++- python/pyspark/sql/connect/plan.py | 34 +++++++++++++++ .../sql/tests/connect/test_connect_plan_only.py | 17 ++++++++ 3 files changed, 98 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 64b2e54f0ef..c6877707ad2 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -156,8 +156,53 @@ class DataFrame(object): def crossJoin(self, other: "DataFrame") -> "DataFrame": ... - def coalesce(self, num_partitions: int) -> "DataFrame": - ... + def coalesce(self, numPartitions: int) -> "DataFrame": + """ + Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. + + Coalesce does not trigger a shuffle. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + numPartitions : int + specify the target number of partitions + + Returns + ------- + :class:`DataFrame` + """ + if not numPartitions > 0: + raise ValueError("numPartitions must be positive.") + return DataFrame.withPlan( + plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=False), + self._session, + ) + + def repartition(self, numPartitions: int) -> "DataFrame": + """ + Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. + + Repartition will shuffle source partition into partitions specified by numPartitions. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + numPartitions : int + specify the target number of partitions + + Returns + ------- + :class:`DataFrame` + """ + if not numPartitions > 0: + raise ValueError("numPartitions must be positive.") + return DataFrame.withPlan( + plan.Repartition(self._plan, num_partitions=numPartitions, shuffle=True), + self._session, + ) def describe(self, cols: List[ColumnRef]) -> Any: ... diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 1d5c80f510e..3bb5558d04b 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -652,6 +652,40 @@ class UnionAll(LogicalPlan): """ +class Repartition(LogicalPlan): + """Repartition Relation into a different number of partitions.""" + + def __init__(self, child: Optional["LogicalPlan"], num_partitions: int, shuffle: bool) -> None: + super().__init__(child) + self._num_partitions = num_partitions + self._shuffle = shuffle + + def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: + rel = proto.Relation() + if self._child is not None: + rel.repartition.input.CopyFrom(self._child.plan(session)) + rel.repartition.shuffle = self._shuffle + rel.repartition.num_partitions = self._num_partitions + return rel + + def print(self, indent: int = 0) -> str: + plan_name = "repartition" if self._shuffle else "coalesce" + c_buf = self._child.print(indent + LogicalPlan.INDENT) if self._child else "" + return f"{' ' * indent}<{plan_name} num_partitions={self._num_partitions}>\n{c_buf}" + + def _repr_html_(self) -> str: + plan_name = "repartition" if self._shuffle else "coalesce" + return f""" + <ul> + <li> + <b>{plan_name}</b><br /> + Child: {self._child_repr_()} + num_partitions: {self._num_partitions} + </li> + </ul> + """ + + class SubqueryAlias(LogicalPlan): """Alias for a relation.""" diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py index 468099cb5c9..6807a13a8c9 100644 --- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py +++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py @@ -215,6 +215,23 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture): plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect) self.assertTrue(plan3.root.set_op.by_name) + def test_coalesce_and_repartition(self): + # SPARK-41026: test Coalesce and Repartition API in Python client. + df = self.connect.readTable(table_name=self.tbl_name) + plan1 = df.coalesce(10)._plan.to_proto(self.connect) + self.assertEqual(10, plan1.root.repartition.num_partitions) + self.assertFalse(plan1.root.repartition.shuffle) + plan2 = df.repartition(20)._plan.to_proto(self.connect) + self.assertTrue(plan2.root.repartition.shuffle) + + with self.assertRaises(ValueError) as context: + df.coalesce(-1)._plan.to_proto(self.connect) + self.assertTrue("numPartitions must be positive" in str(context.exception)) + + with self.assertRaises(ValueError) as context: + df.repartition(-1)._plan.to_proto(self.connect) + self.assertTrue("numPartitions must be positive" in str(context.exception)) + if __name__ == "__main__": from pyspark.sql.tests.connect.test_connect_plan_only import * # noqa: F401 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org