This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 31bff52431d [SPARK-40713][CONNECT] Improve SET operation support in 
the proto and the server
31bff52431d is described below

commit 31bff52431d25dd36253d3cafa17c233ff6a54f2
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Mon Oct 31 14:24:38 2022 +0800

    [SPARK-40713][CONNECT] Improve SET operation support in the proto and the 
server
    
    ### What changes were proposed in this pull request?
    
    1. Rename `Union` to `SetOperation` and make it cover `except`, 
`intersect`, `union`.
    2. Improve server side support for `SetOperation`.
    
    ### Why are the changes needed?
    
    Improve SET operation support in the proto and the server.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    UT
    
    Closes #38166 from amaliujia/SPARK-40713.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../main/protobuf/spark/connect/relations.proto    |  24 +++--
 .../org/apache/spark/sql/connect/dsl/package.scala |  47 ++++++++++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  40 ++++++--
 .../connect/planner/SparkConnectPlannerSuite.scala |  39 ++++++--
 .../connect/planner/SparkConnectProtoSuite.scala   |  36 +++++++
 python/pyspark/sql/connect/plan.py                 |   6 +-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 104 ++++++++++-----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  95 ++++++++++++-------
 8 files changed, 279 insertions(+), 112 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index 94010487ee5..8421535dd8e 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -35,7 +35,7 @@ message Relation {
     Project project = 3;
     Filter filter = 4;
     Join join = 5;
-    Union union = 6;
+    SetOperation set_op = 6;
     Sort sort = 7;
     Limit limit = 8;
     Aggregate aggregate = 9;
@@ -127,15 +127,19 @@ message Join {
   }
 }
 
-// Relation of type [[Union]], at least one input must be set.
-message Union {
-  repeated Relation inputs = 1;
-  UnionType union_type = 2;
-
-  enum UnionType {
-    UNION_TYPE_UNSPECIFIED = 0;
-    UNION_TYPE_DISTINCT = 1;
-    UNION_TYPE_ALL = 2;
+// Relation of type [[SetOperation]]
+message SetOperation {
+  Relation left_input = 1;
+  Relation right_input = 2;
+  SetOpType set_op_type = 3;
+  bool is_all = 4;
+  bool by_name = 5;
+
+  enum SetOpType {
+    SET_OP_TYPE_UNSPECIFIED = 0;
+    SET_OP_TYPE_INTERSECT = 1;
+    SET_OP_TYPE_UNION = 2;
+    SET_OP_TYPE_EXCEPT = 3;
   }
 }
 
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index a9a97e740d8..9ffc4c4a1fe 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
 
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Join.JoinType
+import org.apache.spark.connect.proto.SetOperation.SetOpType
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.connect.planner.DataTypeProtoConverter
 
@@ -322,6 +323,52 @@ package object dsl {
         // resolution only by name in the analyzer.
         proto.Relation.newBuilder().setAggregate(agg.build()).build()
       }
+
+      def except(otherPlan: proto.Relation, isAll: Boolean): proto.Relation = {
+        proto.Relation
+          .newBuilder()
+          .setSetOp(
+            createSetOperation(logicalPlan, otherPlan, 
SetOpType.SET_OP_TYPE_EXCEPT, isAll))
+          .build()
+      }
+
+      def intersect(otherPlan: proto.Relation, isAll: Boolean): proto.Relation 
=
+        proto.Relation
+          .newBuilder()
+          .setSetOp(
+            createSetOperation(logicalPlan, otherPlan, 
SetOpType.SET_OP_TYPE_INTERSECT, isAll))
+          .build()
+
+      def union(
+          otherPlan: proto.Relation,
+          isAll: Boolean = true,
+          byName: Boolean = false): proto.Relation =
+        proto.Relation
+          .newBuilder()
+          .setSetOp(
+            createSetOperation(
+              logicalPlan,
+              otherPlan,
+              SetOpType.SET_OP_TYPE_UNION,
+              isAll,
+              byName))
+          .build()
+
+      private def createSetOperation(
+          left: proto.Relation,
+          right: proto.Relation,
+          t: SetOpType,
+          isAll: Boolean = true,
+          byName: Boolean = false): proto.SetOperation.Builder = {
+        val setOp = proto.SetOperation
+          .newBuilder()
+          .setLeftInput(left)
+          .setRightInput(right)
+          .setSetOpType(t)
+          .setIsAll(isAll)
+          .setByName(byName)
+        setOp
+      }
     }
   }
 }
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index ebdb5a447b1..dda5415abd5 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connect.planner
 
+import scala.annotation.elidable.byName
 import scala.collection.JavaConverters._
 
 import org.apache.spark.connect.proto
@@ -24,9 +25,10 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, 
JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
-import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LogicalPlan, 
Sample, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, 
Intersect, LogicalPlan, Sample, SubqueryAlias, Union}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.types._
@@ -58,8 +60,8 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
       case proto.Relation.RelTypeCase.LIMIT => transformLimit(rel.getLimit)
       case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset)
       case proto.Relation.RelTypeCase.JOIN => transformJoin(rel.getJoin)
-      case proto.Relation.RelTypeCase.UNION => transformUnion(rel.getUnion)
       case proto.Relation.RelTypeCase.DEDUPLICATE => 
transformDeduplicate(rel.getDeduplicate)
+      case proto.Relation.RelTypeCase.SET_OP => 
transformSetOperation(rel.getSetOp)
       case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
       case proto.Relation.RelTypeCase.AGGREGATE => 
transformAggregate(rel.getAggregate)
       case proto.Relation.RelTypeCase.SQL => transformSql(rel.getSql)
@@ -282,15 +284,35 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
     Alias(transformExpression(alias.getExpr), alias.getName)()
   }
 
-  private def transformUnion(u: proto.Union): LogicalPlan = {
-    assert(u.getInputsCount == 2, "Union must have 2 inputs")
-    val plan = logical.Union(transformRelation(u.getInputs(0)), 
transformRelation(u.getInputs(1)))
+  private def transformSetOperation(u: proto.SetOperation): LogicalPlan = {
+    assert(u.hasLeftInput && u.hasRightInput, "Union must have 2 inputs")
 
-    u.getUnionType match {
-      case proto.Union.UnionType.UNION_TYPE_DISTINCT => logical.Distinct(plan)
-      case proto.Union.UnionType.UNION_TYPE_ALL => plan
+    u.getSetOpType match {
+      case proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT =>
+        if (u.getByName) {
+          throw InvalidPlanInput("Except does not support union_by_name")
+        }
+        Except(transformRelation(u.getLeftInput), 
transformRelation(u.getRightInput), u.getIsAll)
+      case proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT =>
+        if (u.getByName) {
+          throw InvalidPlanInput("Intersect does not support union_by_name")
+        }
+        Intersect(
+          transformRelation(u.getLeftInput),
+          transformRelation(u.getRightInput),
+          u.getIsAll)
+      case proto.SetOperation.SetOpType.SET_OP_TYPE_UNION =>
+        val combinedUnion = CombineUnions(
+          Union(
+            Seq(transformRelation(u.getLeftInput), 
transformRelation(u.getRightInput)),
+            byName = u.getByName))
+        if (u.getIsAll) {
+          combinedUnion
+        } else {
+          logical.Deduplicate(combinedUnion.output, combinedUnion)
+        }
       case _ =>
-        throw InvalidPlanInput(s"Unsupported set operation 
${u.getUnionTypeValue}")
+        throw InvalidPlanInput(s"Unsupported set operation 
${u.getSetOpTypeValue}")
     }
   }
 
diff --git 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 49072982c00..df3ce93f33c 100644
--- 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -156,9 +156,10 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
 
   test("Simple Union") {
     intercept[AssertionError](
-      
transform(proto.Relation.newBuilder.setUnion(proto.Union.newBuilder.build()).build))
+      
transform(proto.Relation.newBuilder.setSetOp(proto.SetOperation.newBuilder.build()).build))
     val union = proto.Relation.newBuilder
-      .setUnion(proto.Union.newBuilder.addAllInputs(Seq(readRel, 
readRel).asJava).build())
+      .setSetOp(
+        
proto.SetOperation.newBuilder.setLeftInput(readRel).setRightInput(readRel).build())
       .build()
     val msg = intercept[InvalidPlanInput] {
       transform(union)
@@ -167,10 +168,12 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
 
     val res = transform(
       proto.Relation.newBuilder
-        .setUnion(
-          proto.Union.newBuilder
-            .addAllInputs(Seq(readRel, readRel).asJava)
-            .setUnionType(proto.Union.UnionType.UNION_TYPE_ALL)
+        .setSetOp(
+          proto.SetOperation.newBuilder
+            .setLeftInput(readRel)
+            .setRightInput(readRel)
+            .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_UNION)
+            .setIsAll(true)
             .build())
         .build())
     assert(res.nodeName == "Union")
@@ -301,4 +304,28 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
     }
     assert(e2.getMessage.contains("either deduplicate on all columns or a 
subset of columns"))
   }
+
+  test("Test invalid intersect, except") {
+    // Except with union_by_name=true
+    val except = proto.SetOperation
+      .newBuilder()
+      .setLeftInput(readRel)
+      .setRightInput(readRel)
+      .setByName(true)
+      .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT)
+    val e =
+      
intercept[InvalidPlanInput](transform(proto.Relation.newBuilder.setSetOp(except).build()))
+    assert(e.getMessage.contains("Except does not support union_by_name"))
+
+    // Intersect with union_by_name=true
+    val intersect = proto.SetOperation
+      .newBuilder()
+      .setLeftInput(readRel)
+      .setRightInput(readRel)
+      .setByName(true)
+      .setSetOpType(proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT)
+    val e2 = intercept[InvalidPlanInput](
+      transform(proto.Relation.newBuilder.setSetOp(intersect).build()))
+    assert(e2.getMessage.contains("Intersect does not support union_by_name"))
+  }
 }
diff --git 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index a38b1951eb2..3232901ca8e 100644
--- 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -173,6 +173,42 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
     comparePlans(connectPlan2, sparkPlan2)
   }
 
+  test("Test union, except, intersect") {
+    val connectPlan1 = connectTestRelation.except(connectTestRelation, isAll = 
false)
+    val sparkPlan1 = sparkTestRelation.except(sparkTestRelation)
+    comparePlans(connectPlan1, sparkPlan1)
+
+    val connectPlan2 = connectTestRelation.except(connectTestRelation, isAll = 
true)
+    val sparkPlan2 = sparkTestRelation.exceptAll(sparkTestRelation)
+    comparePlans(connectPlan2, sparkPlan2)
+
+    val connectPlan3 = connectTestRelation.intersect(connectTestRelation, 
isAll = false)
+    val sparkPlan3 = sparkTestRelation.intersect(sparkTestRelation)
+    comparePlans(connectPlan3, sparkPlan3)
+
+    val connectPlan4 = connectTestRelation.intersect(connectTestRelation, 
isAll = true)
+    val sparkPlan4 = sparkTestRelation.intersectAll(sparkTestRelation)
+    comparePlans(connectPlan4, sparkPlan4)
+
+    val connectPlan5 = connectTestRelation.union(connectTestRelation, isAll = 
true)
+    val sparkPlan5 = sparkTestRelation.union(sparkTestRelation)
+    comparePlans(connectPlan5, sparkPlan5)
+
+    val connectPlan6 = connectTestRelation.union(connectTestRelation, isAll = 
false)
+    val sparkPlan6 = sparkTestRelation.union(sparkTestRelation).distinct()
+    comparePlans(connectPlan6, sparkPlan6)
+
+    val connectPlan7 =
+      connectTestRelation.union(connectTestRelation2, isAll = true, byName = 
true)
+    val sparkPlan7 = sparkTestRelation.unionByName(sparkTestRelation2)
+    comparePlans(connectPlan7, sparkPlan7)
+
+    val connectPlan8 =
+      connectTestRelation.union(connectTestRelation2, isAll = false, byName = 
true)
+    val sparkPlan8 = 
sparkTestRelation.unionByName(sparkTestRelation2).distinct()
+    comparePlans(connectPlan8, sparkPlan8)
+  }
+
   private def createLocalRelationProtoByQualifiedAttributes(
       attrs: Seq[proto.Expression.QualifiedAttribute]): proto.Relation = {
     val localRelationBuilder = proto.LocalRelation.newBuilder()
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index d6b6f9e3b67..9f183c116eb 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -608,8 +608,10 @@ class UnionAll(LogicalPlan):
     def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
         assert self._child is not None
         rel = proto.Relation()
-        rel.union.inputs.extend([self._child.plan(session), 
self.other.plan(session)])
-        rel.union.union_type = proto.Union.UnionType.UNION_TYPE_ALL
+        rel.set_op.left_input.CopyFrom(self._child.plan(session))
+        rel.set_op.right_input.CopyFrom(self.other.plan(session))
+        rel.set_op.set_op_type = proto.SetOperation.SET_OP_TYPE_UNION
+        rel.set_op.is_all = True
         return rel
 
     def print(self, indent: int = 0) -> str:
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 2a38a014926..973b322014e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -32,7 +32,7 @@ from pyspark.sql.connect.proto import expressions_pb2 as 
spark_dot_connect_dot_e
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\x8f\x06\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
 
\x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05
 \x01(\x0 [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto"\x97\x06\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
 
\x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilter\x12)\n\x04join\x18\x05
 \x01(\x0 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -44,55 +44,55 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _READ_DATASOURCE_OPTIONSENTRY._options = None
     _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001"
     _RELATION._serialized_start = 82
-    _RELATION._serialized_end = 865
-    _UNKNOWN._serialized_start = 867
-    _UNKNOWN._serialized_end = 876
-    _RELATIONCOMMON._serialized_start = 878
-    _RELATIONCOMMON._serialized_end = 949
-    _SQL._serialized_start = 951
-    _SQL._serialized_end = 978
-    _READ._serialized_start = 981
-    _READ._serialized_end = 1391
-    _READ_NAMEDTABLE._serialized_start = 1123
-    _READ_NAMEDTABLE._serialized_end = 1184
-    _READ_DATASOURCE._serialized_start = 1187
-    _READ_DATASOURCE._serialized_end = 1378
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 1320
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 1378
-    _PROJECT._serialized_start = 1393
-    _PROJECT._serialized_end = 1510
-    _FILTER._serialized_start = 1512
-    _FILTER._serialized_end = 1624
-    _JOIN._serialized_start = 1627
-    _JOIN._serialized_end = 2077
-    _JOIN_JOINTYPE._serialized_start = 1890
-    _JOIN_JOINTYPE._serialized_end = 2077
-    _UNION._serialized_start = 2080
-    _UNION._serialized_end = 2285
-    _UNION_UNIONTYPE._serialized_start = 2201
-    _UNION_UNIONTYPE._serialized_end = 2285
-    _LIMIT._serialized_start = 2287
-    _LIMIT._serialized_end = 2363
-    _OFFSET._serialized_start = 2365
-    _OFFSET._serialized_end = 2444
-    _AGGREGATE._serialized_start = 2447
-    _AGGREGATE._serialized_end = 2772
-    _AGGREGATE_AGGREGATEFUNCTION._serialized_start = 2676
-    _AGGREGATE_AGGREGATEFUNCTION._serialized_end = 2772
-    _SORT._serialized_start = 2775
-    _SORT._serialized_end = 3277
-    _SORT_SORTFIELD._serialized_start = 2895
-    _SORT_SORTFIELD._serialized_end = 3083
-    _SORT_SORTDIRECTION._serialized_start = 3085
-    _SORT_SORTDIRECTION._serialized_end = 3193
-    _SORT_SORTNULLS._serialized_start = 3195
-    _SORT_SORTNULLS._serialized_end = 3277
-    _DEDUPLICATE._serialized_start = 3280
-    _DEDUPLICATE._serialized_end = 3422
-    _LOCALRELATION._serialized_start = 3424
-    _LOCALRELATION._serialized_end = 3517
-    _SAMPLE._serialized_start = 3520
-    _SAMPLE._serialized_end = 3760
-    _SAMPLE_SEED._serialized_start = 3734
-    _SAMPLE_SEED._serialized_end = 3760
+    _RELATION._serialized_end = 873
+    _UNKNOWN._serialized_start = 875
+    _UNKNOWN._serialized_end = 884
+    _RELATIONCOMMON._serialized_start = 886
+    _RELATIONCOMMON._serialized_end = 957
+    _SQL._serialized_start = 959
+    _SQL._serialized_end = 986
+    _READ._serialized_start = 989
+    _READ._serialized_end = 1399
+    _READ_NAMEDTABLE._serialized_start = 1131
+    _READ_NAMEDTABLE._serialized_end = 1192
+    _READ_DATASOURCE._serialized_start = 1195
+    _READ_DATASOURCE._serialized_end = 1386
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 1328
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 1386
+    _PROJECT._serialized_start = 1401
+    _PROJECT._serialized_end = 1518
+    _FILTER._serialized_start = 1520
+    _FILTER._serialized_end = 1632
+    _JOIN._serialized_start = 1635
+    _JOIN._serialized_end = 2085
+    _JOIN_JOINTYPE._serialized_start = 1898
+    _JOIN_JOINTYPE._serialized_end = 2085
+    _SETOPERATION._serialized_start = 2088
+    _SETOPERATION._serialized_end = 2451
+    _SETOPERATION_SETOPTYPE._serialized_start = 2337
+    _SETOPERATION_SETOPTYPE._serialized_end = 2451
+    _LIMIT._serialized_start = 2453
+    _LIMIT._serialized_end = 2529
+    _OFFSET._serialized_start = 2531
+    _OFFSET._serialized_end = 2610
+    _AGGREGATE._serialized_start = 2613
+    _AGGREGATE._serialized_end = 2938
+    _AGGREGATE_AGGREGATEFUNCTION._serialized_start = 2842
+    _AGGREGATE_AGGREGATEFUNCTION._serialized_end = 2938
+    _SORT._serialized_start = 2941
+    _SORT._serialized_end = 3443
+    _SORT_SORTFIELD._serialized_start = 3061
+    _SORT_SORTFIELD._serialized_end = 3249
+    _SORT_SORTDIRECTION._serialized_start = 3251
+    _SORT_SORTDIRECTION._serialized_end = 3359
+    _SORT_SORTNULLS._serialized_start = 3361
+    _SORT_SORTNULLS._serialized_end = 3443
+    _DEDUPLICATE._serialized_start = 3446
+    _DEDUPLICATE._serialized_end = 3588
+    _LOCALRELATION._serialized_start = 3590
+    _LOCALRELATION._serialized_end = 3683
+    _SAMPLE._serialized_start = 3686
+    _SAMPLE._serialized_end = 3926
+    _SAMPLE_SEED._serialized_start = 3900
+    _SAMPLE_SEED._serialized_end = 3926
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index d3186c4e3df..2c2ce8d0711 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -64,7 +64,7 @@ class Relation(google.protobuf.message.Message):
     PROJECT_FIELD_NUMBER: builtins.int
     FILTER_FIELD_NUMBER: builtins.int
     JOIN_FIELD_NUMBER: builtins.int
-    UNION_FIELD_NUMBER: builtins.int
+    SET_OP_FIELD_NUMBER: builtins.int
     SORT_FIELD_NUMBER: builtins.int
     LIMIT_FIELD_NUMBER: builtins.int
     AGGREGATE_FIELD_NUMBER: builtins.int
@@ -85,7 +85,7 @@ class Relation(google.protobuf.message.Message):
     @property
     def join(self) -> global___Join: ...
     @property
-    def union(self) -> global___Union: ...
+    def set_op(self) -> global___SetOperation: ...
     @property
     def sort(self) -> global___Sort: ...
     @property
@@ -112,7 +112,7 @@ class Relation(google.protobuf.message.Message):
         project: global___Project | None = ...,
         filter: global___Filter | None = ...,
         join: global___Join | None = ...,
-        union: global___Union | None = ...,
+        set_op: global___SetOperation | None = ...,
         sort: global___Sort | None = ...,
         limit: global___Limit | None = ...,
         aggregate: global___Aggregate | None = ...,
@@ -150,12 +150,12 @@ class Relation(google.protobuf.message.Message):
             b"rel_type",
             "sample",
             b"sample",
+            "set_op",
+            b"set_op",
             "sort",
             b"sort",
             "sql",
             b"sql",
-            "union",
-            b"union",
             "unknown",
             b"unknown",
         ],
@@ -187,12 +187,12 @@ class Relation(google.protobuf.message.Message):
             b"rel_type",
             "sample",
             b"sample",
+            "set_op",
+            b"set_op",
             "sort",
             b"sort",
             "sql",
             b"sql",
-            "union",
-            b"union",
             "unknown",
             b"unknown",
         ],
@@ -204,7 +204,7 @@ class Relation(google.protobuf.message.Message):
         "project",
         "filter",
         "join",
-        "union",
+        "set_op",
         "sort",
         "limit",
         "aggregate",
@@ -518,48 +518,77 @@ class Join(google.protobuf.message.Message):
 
 global___Join = Join
 
-class Union(google.protobuf.message.Message):
-    """Relation of type [[Union]], at least one input must be set."""
+class SetOperation(google.protobuf.message.Message):
+    """Relation of type [[SetOperation]]"""
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
-    class _UnionType:
+    class _SetOpType:
         ValueType = typing.NewType("ValueType", builtins.int)
         V: typing_extensions.TypeAlias = ValueType
 
-    class _UnionTypeEnumTypeWrapper(
-        
google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Union._UnionType.ValueType],
+    class _SetOpTypeEnumTypeWrapper(
+        google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[
+            SetOperation._SetOpType.ValueType
+        ],
         builtins.type,
     ):  # noqa: F821
         DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
-        UNION_TYPE_UNSPECIFIED: Union._UnionType.ValueType  # 0
-        UNION_TYPE_DISTINCT: Union._UnionType.ValueType  # 1
-        UNION_TYPE_ALL: Union._UnionType.ValueType  # 2
-
-    class UnionType(_UnionType, metaclass=_UnionTypeEnumTypeWrapper): ...
-    UNION_TYPE_UNSPECIFIED: Union.UnionType.ValueType  # 0
-    UNION_TYPE_DISTINCT: Union.UnionType.ValueType  # 1
-    UNION_TYPE_ALL: Union.UnionType.ValueType  # 2
-
-    INPUTS_FIELD_NUMBER: builtins.int
-    UNION_TYPE_FIELD_NUMBER: builtins.int
-    @property
-    def inputs(
-        self,
-    ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Relation]:
 ...
-    union_type: global___Union.UnionType.ValueType
+        SET_OP_TYPE_UNSPECIFIED: SetOperation._SetOpType.ValueType  # 0
+        SET_OP_TYPE_INTERSECT: SetOperation._SetOpType.ValueType  # 1
+        SET_OP_TYPE_UNION: SetOperation._SetOpType.ValueType  # 2
+        SET_OP_TYPE_EXCEPT: SetOperation._SetOpType.ValueType  # 3
+
+    class SetOpType(_SetOpType, metaclass=_SetOpTypeEnumTypeWrapper): ...
+    SET_OP_TYPE_UNSPECIFIED: SetOperation.SetOpType.ValueType  # 0
+    SET_OP_TYPE_INTERSECT: SetOperation.SetOpType.ValueType  # 1
+    SET_OP_TYPE_UNION: SetOperation.SetOpType.ValueType  # 2
+    SET_OP_TYPE_EXCEPT: SetOperation.SetOpType.ValueType  # 3
+
+    LEFT_INPUT_FIELD_NUMBER: builtins.int
+    RIGHT_INPUT_FIELD_NUMBER: builtins.int
+    SET_OP_TYPE_FIELD_NUMBER: builtins.int
+    IS_ALL_FIELD_NUMBER: builtins.int
+    BY_NAME_FIELD_NUMBER: builtins.int
+    @property
+    def left_input(self) -> global___Relation: ...
+    @property
+    def right_input(self) -> global___Relation: ...
+    set_op_type: global___SetOperation.SetOpType.ValueType
+    is_all: builtins.bool
+    by_name: builtins.bool
     def __init__(
         self,
         *,
-        inputs: collections.abc.Iterable[global___Relation] | None = ...,
-        union_type: global___Union.UnionType.ValueType = ...,
+        left_input: global___Relation | None = ...,
+        right_input: global___Relation | None = ...,
+        set_op_type: global___SetOperation.SetOpType.ValueType = ...,
+        is_all: builtins.bool = ...,
+        by_name: builtins.bool = ...,
     ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "left_input", b"left_input", "right_input", b"right_input"
+        ],
+    ) -> builtins.bool: ...
     def ClearField(
         self,
-        field_name: typing_extensions.Literal["inputs", b"inputs", 
"union_type", b"union_type"],
+        field_name: typing_extensions.Literal[
+            "by_name",
+            b"by_name",
+            "is_all",
+            b"is_all",
+            "left_input",
+            b"left_input",
+            "right_input",
+            b"right_input",
+            "set_op_type",
+            b"set_op_type",
+        ],
     ) -> None: ...
 
-global___Union = Union
+global___SetOperation = SetOperation
 
 class Limit(google.protobuf.message.Message):
     """Relation of type [[Limit]] that is used to `limit` rows from the input 
relation."""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to