This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 10722044f42 [SPARK-40977][CONNECT][PYTHON] Complete Support for Union in Python client 10722044f42 is described below commit 10722044f429b1a825018673ca139d698559f6fa Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu Nov 3 13:53:23 2022 +0900 [SPARK-40977][CONNECT][PYTHON] Complete Support for Union in Python client ### What changes were proposed in this pull request? 1. Improve testing coverage for `Union` and `UnionAll` (they are actually both `UnionAll`) 2. Add the API which does `UnionByName`. ### Why are the changes needed? Improve API Coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38453 from amaliujia/python_union. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/dataframe.py | 27 ++++++++++++++++++++++ python/pyspark/sql/connect/plan.py | 6 ++++- .../sql/tests/connect/test_connect_plan_only.py | 10 ++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index b9ddb0db300..b9ba4b99ba0 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -293,6 +293,33 @@ class DataFrame(object): raise ValueError("Argument to Union does not contain a valid plan.") return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), session=self._session) + def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> "DataFrame": + """Returns a new :class:`DataFrame` containing union of rows in this and another + :class:`DataFrame`. + + This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set + union (that does deduplication of elements), use this function followed by :func:`distinct`. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be combined. + allowMissingColumns : bool, optional, default False + Specify whether to allow missing columns. + + Returns + ------- + :class:`DataFrame` + Combined DataFrame. + """ + if other._plan is None: + raise ValueError("Argument to UnionByName does not contain a valid plan.") + return DataFrame.withPlan( + plan.UnionAll(self._plan, other._plan, allowMissingColumns), session=self._session + ) + def where(self, condition: Expression) -> "DataFrame": return self.filter(condition) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 2f1f70ec1a9..cc59a493d5a 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -606,9 +606,12 @@ class Join(LogicalPlan): class UnionAll(LogicalPlan): - def __init__(self, child: Optional["LogicalPlan"], other: "LogicalPlan") -> None: + def __init__( + self, child: Optional["LogicalPlan"], other: "LogicalPlan", by_name: bool = False + ) -> None: super().__init__(child) self.other = other + self.by_name = by_name def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: assert self._child is not None @@ -617,6 +620,7 @@ class UnionAll(LogicalPlan): rel.set_op.right_input.CopyFrom(self.other.plan(session)) rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION rel.set_op.is_all = True + rel.set_op.by_name = self.by_name return rel def print(self, indent: int = 0) -> str: diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py index e40a54b7d0c..8a9b98e73fd 100644 --- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py +++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py @@ -190,6 +190,16 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture): self.assertIsNotNone(plan.root, "Root relation must be set") self.assertIsNotNone(plan.root.read) + def test_union(self): + df1 = self.connect.readTable(table_name=self.tbl_name) + df2 = self.connect.readTable(table_name=self.tbl_name) + plan1 = df1.union(df2)._plan.to_proto(self.connect) + self.assertTrue(plan1.root.set_op.is_all) + plan2 = df1.union(df2)._plan.to_proto(self.connect) + self.assertTrue(plan2.root.set_op.is_all) + plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect) + self.assertTrue(plan3.root.set_op.by_name) + if __name__ == "__main__": from pyspark.sql.tests.connect.test_connect_plan_only import * # noqa: F401 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org