This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9dc39e199de [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client 9dc39e199de is described below commit 9dc39e199de645f60e115267fba2fae782ab53f1 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu Nov 10 12:11:40 2022 +0800 [SPARK-41010][CONNECT][PYTHON] Complete Support for Except and Intersect in Python client ### What changes were proposed in this pull request? 1. Add support for intersect and except. 2. Unify union, intersect and except into `SetOperation`. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38506 from amaliujia/except_python. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/connect/dataframe.py | 82 +++++++++++++++++++++- python/pyspark/sql/connect/plan.py | 38 +++++++--- .../sql/tests/connect/test_connect_plan_only.py | 22 ++++++ 3 files changed, 132 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index c6877707ad2..ccd826cd476 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -389,7 +389,9 @@ class DataFrame(object): def unionAll(self, other: "DataFrame") -> "DataFrame": if other._plan is None: raise ValueError("Argument to Union does not contain a valid plan.") - return DataFrame.withPlan(plan.UnionAll(self._plan, other._plan), session=self._session) + return DataFrame.withPlan( + plan.SetOperation(self._plan, other._plan, "union", is_all=True), session=self._session + ) def unionByName(self, other: "DataFrame", allowMissingColumns: bool = False) -> "DataFrame": """Returns a new :class:`DataFrame` containing union of rows in this and another @@ -415,7 +417,83 @@ class DataFrame(object): if other._plan is None: raise ValueError("Argument to UnionByName does not contain a valid plan.") return DataFrame.withPlan( - plan.UnionAll(self._plan, other._plan, allowMissingColumns), session=self._session + plan.SetOperation( + self._plan, other._plan, "union", is_all=True, by_name=allowMissingColumns + ), + session=self._session, + ) + + def exceptAll(self, other: "DataFrame") -> "DataFrame": + """Return a new :class:`DataFrame` containing rows in this :class:`DataFrame` but + not in another :class:`DataFrame` while preserving duplicates. + + This is equivalent to `EXCEPT ALL` in SQL. + As standard in SQL, this function resolves columns by position (not by name). + + .. versionadded:: 3.4.0 + + Parameters + ---------- + other : :class:`DataFrame` + The other :class:`DataFrame` to compare to. + + Returns + ------- + :class:`DataFrame` + """ + return DataFrame.withPlan( + plan.SetOperation(self._plan, other._plan, "except", is_all=True), session=self._session + ) + + def intersect(self, other: "DataFrame") -> "DataFrame": + """Return a new :class:`DataFrame` containing rows only in + both this :class:`DataFrame` and another :class:`DataFrame`. + Note that any duplicates are removed. To preserve duplicates + use :func:`intersectAll`. + + .. versionadded:: 3.4.0 + + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be combined. + + Returns + ------- + :class:`DataFrame` + Combined DataFrame. + + Notes + ----- + This is equivalent to `INTERSECT` in SQL. + """ + return DataFrame.withPlan( + plan.SetOperation(self._plan, other._plan, "intersect", is_all=False), + session=self._session, + ) + + def intersectAll(self, other: "DataFrame") -> "DataFrame": + """Return a new :class:`DataFrame` containing rows in both this :class:`DataFrame` + and another :class:`DataFrame` while preserving duplicates. + + This is equivalent to `INTERSECT ALL` in SQL. As standard in SQL, this function + resolves columns by position (not by name). + + .. versionadded:: 3.4.0 + + Parameters + ---------- + other : :class:`DataFrame` + Another :class:`DataFrame` that needs to be combined. + + Returns + ------- + :class:`DataFrame` + Combined DataFrame. + """ + return DataFrame.withPlan( + plan.SetOperation(self._plan, other._plan, "intersect", is_all=True), + session=self._session, ) def where(self, condition: Expression) -> "DataFrame": diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 3bb5558d04b..acc5927b519 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -607,21 +607,43 @@ class Join(LogicalPlan): """ -class UnionAll(LogicalPlan): +class SetOperation(LogicalPlan): def __init__( - self, child: Optional["LogicalPlan"], other: "LogicalPlan", by_name: bool = False + self, + child: Optional["LogicalPlan"], + other: Optional["LogicalPlan"], + set_op: str, + is_all: bool = True, + by_name: bool = False, ) -> None: super().__init__(child) self.other = other self.by_name = by_name + self.is_all = is_all + self.set_op = set_op def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: assert self._child is not None rel = proto.Relation() - rel.set_op.left_input.CopyFrom(self._child.plan(session)) - rel.set_op.right_input.CopyFrom(self.other.plan(session)) - rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION - rel.set_op.is_all = True + if self._child is not None: + rel.set_op.left_input.CopyFrom(self._child.plan(session)) + if self.other is not None: + rel.set_op.right_input.CopyFrom(self.other.plan(session)) + if self.set_op == "union": + rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION + elif self.set_op == "intersect": + rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_INTERSECT + elif self.set_op == "except": + rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_EXCEPT + else: + raise NotImplementedError( + """ + Unsupported set operation type: %s. + """ + % rel.set_op.set_op_type + ) + + rel.set_op.is_all = self.is_all rel.set_op.by_name = self.by_name return rel @@ -633,7 +655,7 @@ class UnionAll(LogicalPlan): o = " " * (indent + LogicalPlan.INDENT) n = indent + LogicalPlan.INDENT * 2 return ( - f"{i}UnionAll\n{o}child1=\n{self._child.print(n)}" + f"{i}SetOperation\n{o}child1=\n{self._child.print(n)}" f"\n{o}child2=\n{self.other.print(n)}" ) @@ -644,7 +666,7 @@ class UnionAll(LogicalPlan): return f""" <ul> <li> - <b>Union</b><br /> + <b>SetOperation</b><br /> Left: {self._child._repr_html_()} Right: {self.other._repr_html_()} </li> diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py index 6807a13a8c9..adfaa651c08 100644 --- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py +++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py @@ -210,10 +210,32 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture): df2 = self.connect.readTable(table_name=self.tbl_name) plan1 = df1.union(df2)._plan.to_proto(self.connect) self.assertTrue(plan1.root.set_op.is_all) + self.assertEqual(proto.SetOperation.SET_OP_TYPE_UNION, plan1.root.set_op.set_op_type) plan2 = df1.union(df2)._plan.to_proto(self.connect) self.assertTrue(plan2.root.set_op.is_all) + self.assertEqual(proto.SetOperation.SET_OP_TYPE_UNION, plan2.root.set_op.set_op_type) plan3 = df1.unionByName(df2, True)._plan.to_proto(self.connect) self.assertTrue(plan3.root.set_op.by_name) + self.assertEqual(proto.SetOperation.SET_OP_TYPE_UNION, plan3.root.set_op.set_op_type) + + def test_except(self): + # SPARK-41010: test `except` API for Python client. + df1 = self.connect.readTable(table_name=self.tbl_name) + df2 = self.connect.readTable(table_name=self.tbl_name) + plan1 = df1.exceptAll(df2)._plan.to_proto(self.connect) + self.assertTrue(plan1.root.set_op.is_all) + self.assertEqual(proto.SetOperation.SET_OP_TYPE_EXCEPT, plan1.root.set_op.set_op_type) + + def test_intersect(self): + # SPARK-41010: test `intersect` API for Python client. + df1 = self.connect.readTable(table_name=self.tbl_name) + df2 = self.connect.readTable(table_name=self.tbl_name) + plan1 = df1.intersect(df2)._plan.to_proto(self.connect) + self.assertFalse(plan1.root.set_op.is_all) + self.assertEqual(proto.SetOperation.SET_OP_TYPE_INTERSECT, plan1.root.set_op.set_op_type) + plan2 = df1.intersectAll(df2)._plan.to_proto(self.connect) + self.assertTrue(plan2.root.set_op.is_all) + self.assertEqual(proto.SetOperation.SET_OP_TYPE_INTERSECT, plan2.root.set_op.set_op_type) def test_coalesce_and_repartition(self): # SPARK-41026: test Coalesce and Repartition API in Python client. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org