This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 31bff52431d [SPARK-40713][CONNECT] Improve SET operation support in the proto and the server 31bff52431d is described below commit 31bff52431d25dd36253d3cafa17c233ff6a54f2 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Mon Oct 31 14:24:38 2022 +0800 [SPARK-40713][CONNECT] Improve SET operation support in the proto and the server ### What changes were proposed in this pull request? 1. Rename `Union` to `SetOperation` and make it cover `except`, `intersect`, `union`. 2. Improve server side support for `SetOperation`. ### Why are the changes needed? Improve SET operation support in the proto and the server. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38166 from amaliujia/SPARK-40713. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../main/protobuf/spark/connect/relations.proto | 24 +++-- .../org/apache/spark/sql/connect/dsl/package.scala | 47 ++++++++++ .../sql/connect/planner/SparkConnectPlanner.scala | 40 ++++++-- .../connect/planner/SparkConnectPlannerSuite.scala | 39 ++++++-- .../connect/planner/SparkConnectProtoSuite.scala | 36 +++++++ python/pyspark/sql/connect/plan.py | 6 +- python/pyspark/sql/connect/proto/relations_pb2.py | 104 ++++++++++----------- python/pyspark/sql/connect/proto/relations_pb2.pyi | 95 ++++++++++++------- 8 files changed, 279 insertions(+), 112 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index 94010487ee5..8421535dd8e 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -35,7 +35,7 @@ message Relation { Project project = 3; Filter filter = 4; Join join = 5; - Union union = 6; + SetOperation set_op = 6; Sort sort = 7; Limit limit = 8; Aggregate aggregate = 9; @@ -127,15 +127,19 @@ message Join { } } -// Relation of type [[Union]], at least one input must be set. -message Union { - repeated Relation inputs = 1; - UnionType union_type = 2; - - enum UnionType { - UNION_TYPE_UNSPECIFIED = 0; - UNION_TYPE_DISTINCT = 1; - UNION_TYPE_ALL = 2; +// Relation of type [[SetOperation]] +message SetOperation { + Relation left_input = 1; + Relation right_input = 2; + SetOpType set_op_type = 3; + bool is_all = 4; + bool by_name = 5; + + enum SetOpType { + SET_OP_TYPE_UNSPECIFIED = 0; + SET_OP_TYPE_INTERSECT = 1; + SET_OP_TYPE_UNION = 2; + SET_OP_TYPE_EXCEPT = 3; } } diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index a9a97e740d8..9ffc4c4a1fe 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -21,6 +21,7 @@ import scala.language.implicitConversions import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Join.JoinType +import org.apache.spark.connect.proto.SetOperation.SetOpType import org.apache.spark.sql.SaveMode import org.apache.spark.sql.connect.planner.DataTypeProtoConverter @@ -322,6 +323,52 @@ package object dsl { // resolution only by name in the analyzer. proto.Relation.newBuilder().setAggregate(agg.build()).build() } + + def except(otherPlan: proto.Relation, isAll: Boolean): proto.Relation = { + proto.Relation + .newBuilder() + .setSetOp( + createSetOperation(logicalPlan, otherPlan, SetOpType.SET_OP_TYPE_EXCEPT, isAll)) + .build() + } + + def intersect(otherPlan: proto.Relation, isAll: Boolean): proto.Relation = + proto.Relation + .newBuilder() + .setSetOp( + createSetOperation(logicalPlan, otherPlan, SetOpType.SET_OP_TYPE_INTERSECT, isAll)) + .build() + + def union( + otherPlan: proto.Relation, + isAll: Boolean = true, + byName: Boolean = false): proto.Relation = + proto.Relation + .newBuilder() + .setSetOp( + createSetOperation( + logicalPlan, + otherPlan, + SetOpType.SET_OP_TYPE_UNION, + isAll, + byName)) + .build() + + private def createSetOperation( + left: proto.Relation, + right: proto.Relation, + t: SetOpType, + isAll: Boolean = true, + byName: Boolean = false): proto.SetOperation.Builder = { + val setOp = proto.SetOperation + .newBuilder() + .setLeftInput(left) + .setRightInput(right) + .setSetOpType(t) + .setIsAll(isAll) + .setByName(byName) + setOp + } } } } diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index ebdb5a447b1..dda5415abd5 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connect.planner +import scala.annotation.elidable.byName import scala.collection.JavaConverters._ import org.apache.spark.connect.proto @@ -24,9 +25,10 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin} -import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LogicalPlan, Sample, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, Intersect, LogicalPlan, Sample, SubqueryAlias, Union} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.types._ @@ -58,8 +60,8 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { case proto.Relation.RelTypeCase.LIMIT => transformLimit(rel.getLimit) case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset) case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin) - case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion) case proto.Relation.RelTypeCase.DEDUPLICATE => transformDeduplicate(rel.getDeduplicate) + case proto.Relation.RelTypeCase.SET_OP => transformSetOperation(rel.getSetOp) case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort) case proto.Relation.RelTypeCase.AGGREGATE => transformAggregate(rel.getAggregate) case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql) @@ -282,15 +284,35 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { Alias(transformExpression(alias.getExpr), alias.getName)() } - private def transformUnion(u: proto.Union): LogicalPlan = { - assert(u.getInputsCount == 2, "Union must have 2 inputs") - val plan = logical.Union(transformRelation(u.getInputs(0)), transformRelation(u.getInputs(1))) + private def transformSetOperation(u: proto.SetOperation): LogicalPlan = { + assert(u.hasLeftInput && u.hasRightInput, "Union must have 2 inputs") - u.getUnionType match { - case proto.Union.UnionType.UNION_TYPE_DISTINCT => logical.Distinct(plan) - case proto.Union.UnionType.UNION_TYPE_ALL => plan + u.getSetOpType match { + case proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT => + if (u.getByName) { + throw InvalidPlanInput("Except does not support union_by_name") + } + Except(transformRelation(u.getLeftInput), transformRelation(u.getRightInput), u.getIsAll) + case proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT => + if (u.getByName) { + throw InvalidPlanInput("Intersect does not support union_by_name") + } + Intersect( + transformRelation(u.getLeftInput), + transformRelation(u.getRightInput), + u.getIsAll) + case proto.SetOperation.SetOpType.SET_OP_TYPE_UNION => + val combinedUnion = CombineUnions( + Union( + Seq(transformRelation(u.getLeftInput), transformRelation(u.getRightInput)), + byName = u.getByName)) + if (u.getIsAll) { + combinedUnion + } else { + logical.Deduplicate(combinedUnion.output, combinedUnion) + } case _ => - throw InvalidPlanInput(s"Unsupported set operation ${u.getUnionTypeValue}") + throw InvalidPlanInput(s"Unsupported set operation ${u.getSetOpTypeValue}") } } diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 49072982c00..df3ce93f33c 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -156,9 +156,10 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { test("Simple Union") { intercept[AssertionError]( - transform(proto.Relation.newBuilder.setUnion(proto.Union.newBuilder.build()).build)) + transform(proto.Relation.newBuilder.setSetOp(proto.SetOperation.newBuilder.build()).build)) val union = proto.Relation.newBuilder - .setUnion(proto.Union.newBuilder.addAllInputs(Seq(readRel, readRel).asJava).build()) + .setSetOp( + proto.SetOperation.newBuilder.setLeftInput(readRel).setRightInput(readRel).build()) .build() val msg = intercept[InvalidPlanInput] { transform(union) @@ -167,10 +168,12 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { val res = transform( proto.Relation.newBuilder - .setUnion( - proto.Union.newBuilder - .addAllInputs(Seq(readRel, readRel).asJava) - .setUnionType(proto.Union.UnionType.UNION_TYPE_ALL) + .setSetOp( + proto.SetOperation.newBuilder + .setLeftInput(readRel) + .setRightInput(readRel) + .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_UNION) + .setIsAll(true) .build()) .build()) assert(res.nodeName == "Union") @@ -301,4 +304,28 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { } assert(e2.getMessage.contains("either deduplicate on all columns or a subset of columns")) } + + test("Test invalid intersect, except") { + // Except with union_by_name=true + val except = proto.SetOperation + .newBuilder() + .setLeftInput(readRel) + .setRightInput(readRel) + .setByName(true) + .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT) + val e = + intercept[InvalidPlanInput](transform(proto.Relation.newBuilder.setSetOp(except).build())) + assert(e.getMessage.contains("Except does not support union_by_name")) + + // Intersect with union_by_name=true + val intersect = proto.SetOperation + .newBuilder() + .setLeftInput(readRel) + .setRightInput(readRel) + .setByName(true) + .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT) + val e2 = intercept[InvalidPlanInput]( + transform(proto.Relation.newBuilder.setSetOp(intersect).build())) + assert(e2.getMessage.contains("Intersect does not support union_by_name")) + } } diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index a38b1951eb2..3232901ca8e 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -173,6 +173,42 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { comparePlans(connectPlan2, sparkPlan2) } + test("Test union, except, intersect") { + val connectPlan1 = connectTestRelation.except(connectTestRelation, isAll = false) + val sparkPlan1 = sparkTestRelation.except(sparkTestRelation) + comparePlans(connectPlan1, sparkPlan1) + + val connectPlan2 = connectTestRelation.except(connectTestRelation, isAll = true) + val sparkPlan2 = sparkTestRelation.exceptAll(sparkTestRelation) + comparePlans(connectPlan2, sparkPlan2) + + val connectPlan3 = connectTestRelation.intersect(connectTestRelation, isAll = false) + val sparkPlan3 = sparkTestRelation.intersect(sparkTestRelation) + comparePlans(connectPlan3, sparkPlan3) + + val connectPlan4 = connectTestRelation.intersect(connectTestRelation, isAll = true) + val sparkPlan4 = sparkTestRelation.intersectAll(sparkTestRelation) + comparePlans(connectPlan4, sparkPlan4) + + val connectPlan5 = connectTestRelation.union(connectTestRelation, isAll = true) + val sparkPlan5 = sparkTestRelation.union(sparkTestRelation) + comparePlans(connectPlan5, sparkPlan5) + + val connectPlan6 = connectTestRelation.union(connectTestRelation, isAll = false) + val sparkPlan6 = sparkTestRelation.union(sparkTestRelation).distinct() + comparePlans(connectPlan6, sparkPlan6) + + val connectPlan7 = + connectTestRelation.union(connectTestRelation2, isAll = true, byName = true) + val sparkPlan7 = sparkTestRelation.unionByName(sparkTestRelation2) + comparePlans(connectPlan7, sparkPlan7) + + val connectPlan8 = + connectTestRelation.union(connectTestRelation2, isAll = false, byName = true) + val sparkPlan8 = sparkTestRelation.unionByName(sparkTestRelation2).distinct() + comparePlans(connectPlan8, sparkPlan8) + } + private def createLocalRelationProtoByQualifiedAttributes( attrs: Seq[proto.Expression.QualifiedAttribute]): proto.Relation = { val localRelationBuilder = proto.LocalRelation.newBuilder() diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index d6b6f9e3b67..9f183c116eb 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -608,8 +608,10 @@ class UnionAll(LogicalPlan): def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: assert self._child is not None rel = proto.Relation() - rel.union.inputs.extend([self._child.plan(session), self.other.plan(session)]) - rel.union.union_type = proto.Union.UnionType.UNION_TYPE_ALL + rel.set_op.left_input.CopyFrom(self._child.plan(session)) + rel.set_op.right_input.CopyFrom(self.other.plan(session)) + rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION + rel.set_op.is_all = True return rel def print(self, indent: int = 0) -> str: diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py b/python/pyspark/sql/connect/proto/relations_pb2.py index 2a38a014926..973b322014e 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.py +++ b/python/pyspark/sql/connect/proto/relations_pb2.py @@ -32,7 +32,7 @@ from pyspark.sql.connect.proto import expressions_pb2 as spark_dot_connect_dot_e DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\x8f\x06\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05 \x01(\x0 [...] + b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\x97\x06\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05 \x01(\x0 [...] ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -44,55 +44,55 @@ if _descriptor._USE_C_DESCRIPTORS == False: _READ_DATASOURCE_OPTIONSENTRY._options = None _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001" _RELATION._serialized_start = 82 - _RELATION._serialized_end = 865 - _UNKNOWN._serialized_start = 867 - _UNKNOWN._serialized_end = 876 - _RELATIONCOMMON._serialized_start = 878 - _RELATIONCOMMON._serialized_end = 949 - _SQL._serialized_start = 951 - _SQL._serialized_end = 978 - _READ._serialized_start = 981 - _READ._serialized_end = 1391 - _READ_NAMEDTABLE._serialized_start = 1123 - _READ_NAMEDTABLE._serialized_end = 1184 - _READ_DATASOURCE._serialized_start = 1187 - _READ_DATASOURCE._serialized_end = 1378 - _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 1320 - _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 1378 - _PROJECT._serialized_start = 1393 - _PROJECT._serialized_end = 1510 - _FILTER._serialized_start = 1512 - _FILTER._serialized_end = 1624 - _JOIN._serialized_start = 1627 - _JOIN._serialized_end = 2077 - _JOIN_JOINTYPE._serialized_start = 1890 - _JOIN_JOINTYPE._serialized_end = 2077 - _UNION._serialized_start = 2080 - _UNION._serialized_end = 2285 - _UNION_UNIONTYPE._serialized_start = 2201 - _UNION_UNIONTYPE._serialized_end = 2285 - _LIMIT._serialized_start = 2287 - _LIMIT._serialized_end = 2363 - _OFFSET._serialized_start = 2365 - _OFFSET._serialized_end = 2444 - _AGGREGATE._serialized_start = 2447 - _AGGREGATE._serialized_end = 2772 - _AGGREGATE_AGGREGATEFUNCTION._serialized_start = 2676 - _AGGREGATE_AGGREGATEFUNCTION._serialized_end = 2772 - _SORT._serialized_start = 2775 - _SORT._serialized_end = 3277 - _SORT_SORTFIELD._serialized_start = 2895 - _SORT_SORTFIELD._serialized_end = 3083 - _SORT_SORTDIRECTION._serialized_start = 3085 - _SORT_SORTDIRECTION._serialized_end = 3193 - _SORT_SORTNULLS._serialized_start = 3195 - _SORT_SORTNULLS._serialized_end = 3277 - _DEDUPLICATE._serialized_start = 3280 - _DEDUPLICATE._serialized_end = 3422 - _LOCALRELATION._serialized_start = 3424 - _LOCALRELATION._serialized_end = 3517 - _SAMPLE._serialized_start = 3520 - _SAMPLE._serialized_end = 3760 - _SAMPLE_SEED._serialized_start = 3734 - _SAMPLE_SEED._serialized_end = 3760 + _RELATION._serialized_end = 873 + _UNKNOWN._serialized_start = 875 + _UNKNOWN._serialized_end = 884 + _RELATIONCOMMON._serialized_start = 886 + _RELATIONCOMMON._serialized_end = 957 + _SQL._serialized_start = 959 + _SQL._serialized_end = 986 + _READ._serialized_start = 989 + _READ._serialized_end = 1399 + _READ_NAMEDTABLE._serialized_start = 1131 + _READ_NAMEDTABLE._serialized_end = 1192 + _READ_DATASOURCE._serialized_start = 1195 + _READ_DATASOURCE._serialized_end = 1386 + _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 1328 + _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 1386 + _PROJECT._serialized_start = 1401 + _PROJECT._serialized_end = 1518 + _FILTER._serialized_start = 1520 + _FILTER._serialized_end = 1632 + _JOIN._serialized_start = 1635 + _JOIN._serialized_end = 2085 + _JOIN_JOINTYPE._serialized_start = 1898 + _JOIN_JOINTYPE._serialized_end = 2085 + _SETOPERATION._serialized_start = 2088 + _SETOPERATION._serialized_end = 2451 + _SETOPERATION_SETOPTYPE._serialized_start = 2337 + _SETOPERATION_SETOPTYPE._serialized_end = 2451 + _LIMIT._serialized_start = 2453 + _LIMIT._serialized_end = 2529 + _OFFSET._serialized_start = 2531 + _OFFSET._serialized_end = 2610 + _AGGREGATE._serialized_start = 2613 + _AGGREGATE._serialized_end = 2938 + _AGGREGATE_AGGREGATEFUNCTION._serialized_start = 2842 + _AGGREGATE_AGGREGATEFUNCTION._serialized_end = 2938 + _SORT._serialized_start = 2941 + _SORT._serialized_end = 3443 + _SORT_SORTFIELD._serialized_start = 3061 + _SORT_SORTFIELD._serialized_end = 3249 + _SORT_SORTDIRECTION._serialized_start = 3251 + _SORT_SORTDIRECTION._serialized_end = 3359 + _SORT_SORTNULLS._serialized_start = 3361 + _SORT_SORTNULLS._serialized_end = 3443 + _DEDUPLICATE._serialized_start = 3446 + _DEDUPLICATE._serialized_end = 3588 + _LOCALRELATION._serialized_start = 3590 + _LOCALRELATION._serialized_end = 3683 + _SAMPLE._serialized_start = 3686 + _SAMPLE._serialized_end = 3926 + _SAMPLE_SEED._serialized_start = 3900 + _SAMPLE_SEED._serialized_end = 3926 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi b/python/pyspark/sql/connect/proto/relations_pb2.pyi index d3186c4e3df..2c2ce8d0711 100644 --- a/python/pyspark/sql/connect/proto/relations_pb2.pyi +++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi @@ -64,7 +64,7 @@ class Relation(google.protobuf.message.Message): PROJECT_FIELD_NUMBER: builtins.int FILTER_FIELD_NUMBER: builtins.int JOIN_FIELD_NUMBER: builtins.int - UNION_FIELD_NUMBER: builtins.int + SET_OP_FIELD_NUMBER: builtins.int SORT_FIELD_NUMBER: builtins.int LIMIT_FIELD_NUMBER: builtins.int AGGREGATE_FIELD_NUMBER: builtins.int @@ -85,7 +85,7 @@ class Relation(google.protobuf.message.Message): @property def join(self) -> global___Join: ... @property - def union(self) -> global___Union: ... + def set_op(self) -> global___SetOperation: ... @property def sort(self) -> global___Sort: ... @property @@ -112,7 +112,7 @@ class Relation(google.protobuf.message.Message): project: global___Project | None = ..., filter: global___Filter | None = ..., join: global___Join | None = ..., - union: global___Union | None = ..., + set_op: global___SetOperation | None = ..., sort: global___Sort | None = ..., limit: global___Limit | None = ..., aggregate: global___Aggregate | None = ..., @@ -150,12 +150,12 @@ class Relation(google.protobuf.message.Message): b"rel_type", "sample", b"sample", + "set_op", + b"set_op", "sort", b"sort", "sql", b"sql", - "union", - b"union", "unknown", b"unknown", ], @@ -187,12 +187,12 @@ class Relation(google.protobuf.message.Message): b"rel_type", "sample", b"sample", + "set_op", + b"set_op", "sort", b"sort", "sql", b"sql", - "union", - b"union", "unknown", b"unknown", ], @@ -204,7 +204,7 @@ class Relation(google.protobuf.message.Message): "project", "filter", "join", - "union", + "set_op", "sort", "limit", "aggregate", @@ -518,48 +518,77 @@ class Join(google.protobuf.message.Message): global___Join = Join -class Union(google.protobuf.message.Message): - """Relation of type [[Union]], at least one input must be set.""" +class SetOperation(google.protobuf.message.Message): + """Relation of type [[SetOperation]]""" DESCRIPTOR: google.protobuf.descriptor.Descriptor - class _UnionType: + class _SetOpType: ValueType = typing.NewType("ValueType", builtins.int) V: typing_extensions.TypeAlias = ValueType - class _UnionTypeEnumTypeWrapper( - google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Union._UnionType.ValueType], + class _SetOpTypeEnumTypeWrapper( + google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[ + SetOperation._SetOpType.ValueType + ], builtins.type, ): # noqa: F821 DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - UNION_TYPE_UNSPECIFIED: Union._UnionType.ValueType # 0 - UNION_TYPE_DISTINCT: Union._UnionType.ValueType # 1 - UNION_TYPE_ALL: Union._UnionType.ValueType # 2 - - class UnionType(_UnionType, metaclass=_UnionTypeEnumTypeWrapper): ... - UNION_TYPE_UNSPECIFIED: Union.UnionType.ValueType # 0 - UNION_TYPE_DISTINCT: Union.UnionType.ValueType # 1 - UNION_TYPE_ALL: Union.UnionType.ValueType # 2 - - INPUTS_FIELD_NUMBER: builtins.int - UNION_TYPE_FIELD_NUMBER: builtins.int - @property - def inputs( - self, - ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Relation]: ... - union_type: global___Union.UnionType.ValueType + SET_OP_TYPE_UNSPECIFIED: SetOperation._SetOpType.ValueType # 0 + SET_OP_TYPE_INTERSECT: SetOperation._SetOpType.ValueType # 1 + SET_OP_TYPE_UNION: SetOperation._SetOpType.ValueType # 2 + SET_OP_TYPE_EXCEPT: SetOperation._SetOpType.ValueType # 3 + + class SetOpType(_SetOpType, metaclass=_SetOpTypeEnumTypeWrapper): ... + SET_OP_TYPE_UNSPECIFIED: SetOperation.SetOpType.ValueType # 0 + SET_OP_TYPE_INTERSECT: SetOperation.SetOpType.ValueType # 1 + SET_OP_TYPE_UNION: SetOperation.SetOpType.ValueType # 2 + SET_OP_TYPE_EXCEPT: SetOperation.SetOpType.ValueType # 3 + + LEFT_INPUT_FIELD_NUMBER: builtins.int + RIGHT_INPUT_FIELD_NUMBER: builtins.int + SET_OP_TYPE_FIELD_NUMBER: builtins.int + IS_ALL_FIELD_NUMBER: builtins.int + BY_NAME_FIELD_NUMBER: builtins.int + @property + def left_input(self) -> global___Relation: ... + @property + def right_input(self) -> global___Relation: ... + set_op_type: global___SetOperation.SetOpType.ValueType + is_all: builtins.bool + by_name: builtins.bool def __init__( self, *, - inputs: collections.abc.Iterable[global___Relation] | None = ..., - union_type: global___Union.UnionType.ValueType = ..., + left_input: global___Relation | None = ..., + right_input: global___Relation | None = ..., + set_op_type: global___SetOperation.SetOpType.ValueType = ..., + is_all: builtins.bool = ..., + by_name: builtins.bool = ..., ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "left_input", b"left_input", "right_input", b"right_input" + ], + ) -> builtins.bool: ... def ClearField( self, - field_name: typing_extensions.Literal["inputs", b"inputs", "union_type", b"union_type"], + field_name: typing_extensions.Literal[ + "by_name", + b"by_name", + "is_all", + b"is_all", + "left_input", + b"left_input", + "right_input", + b"right_input", + "set_op_type", + b"set_op_type", + ], ) -> None: ... -global___Union = Union +global___SetOperation = SetOperation class Limit(google.protobuf.message.Message): """Relation of type [[Limit]] that is used to `limit` rows from the input relation.""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org