This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 400d4ded64e [SPARK-43662][PS][CONNECT] Support merge_asof in Spark 
Connect
400d4ded64e is described below

commit 400d4ded64efdf62b75f2bcdc6025d474d6395ee
Author: Takuya UESHIN <ues...@databricks.com>
AuthorDate: Wed Sep 27 15:09:47 2023 +0800

    [SPARK-43662][PS][CONNECT] Support merge_asof in Spark Connect
    
    ### What changes were proposed in this pull request?
    
    Supports `merge_asof` in Spark Connect.
    
    ### Why are the changes needed?
    
    `merge_asof` is missing in Spark Connect.
    
    Ref: 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.merge_asof.html
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, `merge_asof` is available in Spark Connect.
    
    ### How was this patch tested?
    
    The parity tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43137 from ueshin/issues/SPARK-43662/merge_asof.
    
    Authored-by: Takuya UESHIN <ues...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../main/protobuf/spark/connect/relations.proto    |  42 ++++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  37 +++
 .../pandas/tests/connect/test_parity_reshape.py    |   4 +-
 python/pyspark/sql/connect/dataframe.py            |  41 +++
 python/pyspark/sql/connect/plan.py                 |  94 +++++++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 278 +++++++++++----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi | 123 +++++++++
 python/pyspark/sql/dataframe.py                    |   3 +
 8 files changed, 481 insertions(+), 141 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 0cf08431d46..deb33978386 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -72,6 +72,7 @@ message Relation {
     CachedLocalRelation cached_local_relation = 36;
     CachedRemoteRelation cached_remote_relation = 37;
     CommonInlineUserDefinedTableFunction 
common_inline_user_defined_table_function = 38;
+    AsOfJoin as_of_join = 39;
 
     // NA functions
     NAFill fill_na = 90;
@@ -1009,3 +1010,44 @@ message Parse {
     PARSE_FORMAT_JSON = 2;
   }
 }
+
+// Relation of type [[AsOfJoin]].
+//
+// `left` and `right` must be present.
+message AsOfJoin {
+  // (Required) Left input relation for a Join.
+  Relation left = 1;
+
+  // (Required) Right input relation for a Join.
+  Relation right = 2;
+
+  // (Required) Field to join on in left DataFrame
+  Expression left_as_of = 3;
+
+  // (Required) Field to join on in right DataFrame
+  Expression right_as_of = 4;
+
+  // (Optional) The join condition. Could be unset when `using_columns` is 
utilized.
+  //
+  // This field does not co-exist with using_columns.
+  Expression join_expr = 5;
+
+  // Optional. using_columns provides a list of columns that should present on 
both sides of
+  // the join inputs that this Join will join on. For example A JOIN B USING 
col_name is
+  // equivalent to A JOIN B on A.col_name = B.col_name.
+  //
+  // This field does not co-exist with join_condition.
+  repeated string using_columns = 6;
+
+  // (Required) The join type.
+  string join_type = 7;
+
+  // (Optional) The asof tolerance within this range.
+  Expression tolerance = 8;
+
+  // (Required) Whether allow matching with the same value or not.
+  bool allow_exact_matches = 9;
+
+  // (Required) Whether to search for prior, subsequent, or closest matches.
+  string direction = 10;
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dda7a713fa0..e6b5d89e1ab 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -110,6 +110,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) 
extends Logging {
       case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset)
       case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail)
       case proto.Relation.RelTypeCase.JOIN => 
transformJoinOrJoinWith(rel.getJoin)
+      case proto.Relation.RelTypeCase.AS_OF_JOIN => 
transformAsOfJoin(rel.getAsOfJoin)
       case proto.Relation.RelTypeCase.DEDUPLICATE => 
transformDeduplicate(rel.getDeduplicate)
       case proto.Relation.RelTypeCase.SET_OP => 
transformSetOperation(rel.getSetOp)
       case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
@@ -2275,6 +2276,42 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
     }
   }
 
+  private def transformAsOfJoin(rel: proto.AsOfJoin): LogicalPlan = {
+    val left = Dataset.ofRows(session, transformRelation(rel.getLeft))
+    val right = Dataset.ofRows(session, transformRelation(rel.getRight))
+    val leftAsOf = Column(transformExpression(rel.getLeftAsOf))
+    val rightAsOf = Column(transformExpression(rel.getRightAsOf))
+    val joinType = rel.getJoinType
+    val tolerance = if (rel.hasTolerance) 
Column(transformExpression(rel.getTolerance)) else null
+    val allowExactMatches = rel.getAllowExactMatches
+    val direction = rel.getDirection
+
+    val joined = if (rel.getUsingColumnsCount > 0) {
+      val usingColumns = rel.getUsingColumnsList.asScala.toSeq
+      left.joinAsOf(
+        other = right,
+        leftAsOf = leftAsOf,
+        rightAsOf = rightAsOf,
+        usingColumns = usingColumns,
+        joinType = joinType,
+        tolerance = tolerance,
+        allowExactMatches = allowExactMatches,
+        direction = direction)
+    } else {
+      val joinExprs = if (rel.hasJoinExpr) 
Column(transformExpression(rel.getJoinExpr)) else null
+      left.joinAsOf(
+        other = right,
+        leftAsOf = leftAsOf,
+        rightAsOf = rightAsOf,
+        joinExprs = joinExprs,
+        joinType = joinType,
+        tolerance = tolerance,
+        allowExactMatches = allowExactMatches,
+        direction = direction)
+    }
+    joined.logicalPlan
+  }
+
   private def transformSort(sort: proto.Sort): LogicalPlan = {
     assert(sort.getOrderCount > 0, "'order' must be present and contain 
elements.")
     logical.Sort(
diff --git a/python/pyspark/pandas/tests/connect/test_parity_reshape.py 
b/python/pyspark/pandas/tests/connect/test_parity_reshape.py
index 0773978ba4b..356baaff5ba 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_reshape.py
+++ b/python/pyspark/pandas/tests/connect/test_parity_reshape.py
@@ -22,9 +22,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
 class ReshapeParityTests(ReshapeTestsMixin, PandasOnSparkTestUtils, 
ReusedConnectTestCase):
-    @unittest.skip("TODO(SPARK-43662): Enable 
ReshapeParityTests.test_merge_asof.")
-    def test_merge_asof(self):
-        super().test_merge_asof()
+    pass
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 9cc1ddead33..5e2623336a2 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -594,6 +594,47 @@ class DataFrame:
 
     join.__doc__ = PySparkDataFrame.join.__doc__
 
+    def _joinAsOf(
+        self,
+        other: "DataFrame",
+        leftAsOfColumn: Union[str, Column],
+        rightAsOfColumn: Union[str, Column],
+        on: Optional[Union[str, List[str], Column, List[Column]]] = None,
+        how: Optional[str] = None,
+        *,
+        tolerance: Optional[Column] = None,
+        allowExactMatches: bool = True,
+        direction: str = "backward",
+    ) -> "DataFrame":
+        if self._plan is None:
+            raise Exception("Cannot join when self._plan is empty.")
+        if other._plan is None:
+            raise Exception("Cannot join when other._plan is empty.")
+
+        if how is None:
+            how = "inner"
+        assert isinstance(how, str), "how should be a string"
+
+        if tolerance is not None:
+            assert isinstance(tolerance, Column), "tolerance should be Column"
+
+        return DataFrame.withPlan(
+            plan.AsOfJoin(
+                left=self._plan,
+                right=other._plan,
+                left_as_of=leftAsOfColumn,
+                right_as_of=rightAsOfColumn,
+                on=on,
+                how=how,
+                tolerance=tolerance,
+                allow_exact_matches=allowExactMatches,
+                direction=direction,
+            ),
+            session=self._session,
+        )
+
+    _joinAsOf.__doc__ = PySparkDataFrame._joinAsOf.__doc__
+
     def limit(self, n: int) -> "DataFrame":
         return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n), 
session=self._session)
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 6758b3673f3..10565b9965a 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -900,6 +900,100 @@ class Join(LogicalPlan):
         """
 
 
+class AsOfJoin(LogicalPlan):
+    def __init__(
+        self,
+        left: LogicalPlan,
+        right: LogicalPlan,
+        left_as_of: "ColumnOrName",
+        right_as_of: "ColumnOrName",
+        on: Optional[Union[str, List[str], Column, List[Column]]],
+        how: str,
+        tolerance: Optional[Column],
+        allow_exact_matches: bool,
+        direction: str,
+    ) -> None:
+        super().__init__(left)
+        self.left = left
+        self.right = right
+        self.left_as_of = left_as_of
+        self.right_as_of = right_as_of
+        self.on = on
+        self.how = how
+        self.tolerance = tolerance
+        self.allow_exact_matches = allow_exact_matches
+        self.direction = direction
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = self._create_proto_relation()
+        plan.as_of_join.left.CopyFrom(self.left.plan(session))
+        plan.as_of_join.right.CopyFrom(self.right.plan(session))
+
+        if isinstance(self.left_as_of, Column):
+            
plan.as_of_join.left_as_of.CopyFrom(self.left_as_of.to_plan(session))
+        else:
+            plan.as_of_join.left_as_of.CopyFrom(
+                ColumnReference(self.left_as_of, 
self.left._plan_id).to_plan(session)
+            )
+
+        if isinstance(self.right_as_of, Column):
+            
plan.as_of_join.right_as_of.CopyFrom(self.right_as_of.to_plan(session))
+        else:
+            plan.as_of_join.right_as_of.CopyFrom(
+                ColumnReference(self.right_as_of, 
self.right._plan_id).to_plan(session)
+            )
+
+        if self.on is not None:
+            if not isinstance(self.on, list):
+                if isinstance(self.on, str):
+                    plan.as_of_join.using_columns.append(self.on)
+                else:
+                    
plan.as_of_join.join_expr.CopyFrom(self.on.to_plan(session))
+            elif len(self.on) > 0:
+                if isinstance(self.on[0], str):
+                    plan.as_of_join.using_columns.extend(cast(List[str], 
self.on))
+                else:
+                    merge_column = functools.reduce(lambda c1, c2: c1 & c2, 
self.on)
+                    plan.as_of_join.join_expr.CopyFrom(cast(Column, 
merge_column).to_plan(session))
+
+        plan.as_of_join.join_type = self.how
+
+        if self.tolerance is not None:
+            plan.as_of_join.tolerance.CopyFrom(self.tolerance.to_plan(session))
+
+        plan.as_of_join.allow_exact_matches = self.allow_exact_matches
+        plan.as_of_join.direction = self.direction
+
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        assert self.left is not None
+        assert self.right is not None
+
+        i = " " * indent
+        o = " " * (indent + LogicalPlan.INDENT)
+        n = indent + LogicalPlan.INDENT * 2
+        return (
+            f"{i}<AsOfJoin left_as_of={self.left_as_of}, 
right_as_of={self.right_as_of}, "
+            f"on={self.on} how={self.how}>\n{o}"
+            f"left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}"
+        )
+
+    def _repr_html_(self) -> str:
+        assert self.left is not None
+        assert self.right is not None
+
+        return f"""
+        <ul>
+            <li>
+                <b>AsOfJoin</b><br />
+                Left: {self.left._repr_html_()}
+                Right: {self.right._repr_html_()}
+            </li>
+        </uL>
+        """
+
+
 class SetOperation(LogicalPlan):
     def __init__(
         self,
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index ebb2682c619..fc70cdea402 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catal
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xe1\x18\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x9a\x19\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -62,141 +62,143 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _PARSE_OPTIONSENTRY._options = None
     _PARSE_OPTIONSENTRY._serialized_options = b"8\001"
     _RELATION._serialized_start = 165
-    _RELATION._serialized_end = 3334
-    _UNKNOWN._serialized_start = 3336
-    _UNKNOWN._serialized_end = 3345
-    _RELATIONCOMMON._serialized_start = 3347
-    _RELATIONCOMMON._serialized_end = 3438
-    _SQL._serialized_start = 3441
-    _SQL._serialized_end = 3919
-    _SQL_ARGSENTRY._serialized_start = 3735
-    _SQL_ARGSENTRY._serialized_end = 3825
-    _SQL_NAMEDARGUMENTSENTRY._serialized_start = 3827
-    _SQL_NAMEDARGUMENTSENTRY._serialized_end = 3919
-    _READ._serialized_start = 3922
-    _READ._serialized_end = 4585
-    _READ_NAMEDTABLE._serialized_start = 4100
-    _READ_NAMEDTABLE._serialized_end = 4292
-    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4234
-    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4292
-    _READ_DATASOURCE._serialized_start = 4295
-    _READ_DATASOURCE._serialized_end = 4572
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4234
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4292
-    _PROJECT._serialized_start = 4587
-    _PROJECT._serialized_end = 4704
-    _FILTER._serialized_start = 4706
-    _FILTER._serialized_end = 4818
-    _JOIN._serialized_start = 4821
-    _JOIN._serialized_end = 5482
-    _JOIN_JOINDATATYPE._serialized_start = 5160
-    _JOIN_JOINDATATYPE._serialized_end = 5252
-    _JOIN_JOINTYPE._serialized_start = 5255
-    _JOIN_JOINTYPE._serialized_end = 5463
-    _SETOPERATION._serialized_start = 5485
-    _SETOPERATION._serialized_end = 5964
-    _SETOPERATION_SETOPTYPE._serialized_start = 5801
-    _SETOPERATION_SETOPTYPE._serialized_end = 5915
-    _LIMIT._serialized_start = 5966
-    _LIMIT._serialized_end = 6042
-    _OFFSET._serialized_start = 6044
-    _OFFSET._serialized_end = 6123
-    _TAIL._serialized_start = 6125
-    _TAIL._serialized_end = 6200
-    _AGGREGATE._serialized_start = 6203
-    _AGGREGATE._serialized_end = 6785
-    _AGGREGATE_PIVOT._serialized_start = 6542
-    _AGGREGATE_PIVOT._serialized_end = 6653
-    _AGGREGATE_GROUPTYPE._serialized_start = 6656
-    _AGGREGATE_GROUPTYPE._serialized_end = 6785
-    _SORT._serialized_start = 6788
-    _SORT._serialized_end = 6948
-    _DROP._serialized_start = 6951
-    _DROP._serialized_end = 7092
-    _DEDUPLICATE._serialized_start = 7095
-    _DEDUPLICATE._serialized_end = 7335
-    _LOCALRELATION._serialized_start = 7337
-    _LOCALRELATION._serialized_end = 7426
-    _CACHEDLOCALRELATION._serialized_start = 7428
-    _CACHEDLOCALRELATION._serialized_end = 7500
-    _CACHEDREMOTERELATION._serialized_start = 7502
-    _CACHEDREMOTERELATION._serialized_end = 7557
-    _SAMPLE._serialized_start = 7560
-    _SAMPLE._serialized_end = 7833
-    _RANGE._serialized_start = 7836
-    _RANGE._serialized_end = 7981
-    _SUBQUERYALIAS._serialized_start = 7983
-    _SUBQUERYALIAS._serialized_end = 8097
-    _REPARTITION._serialized_start = 8100
-    _REPARTITION._serialized_end = 8242
-    _SHOWSTRING._serialized_start = 8245
-    _SHOWSTRING._serialized_end = 8387
-    _HTMLSTRING._serialized_start = 8389
-    _HTMLSTRING._serialized_end = 8503
-    _STATSUMMARY._serialized_start = 8505
-    _STATSUMMARY._serialized_end = 8597
-    _STATDESCRIBE._serialized_start = 8599
-    _STATDESCRIBE._serialized_end = 8680
-    _STATCROSSTAB._serialized_start = 8682
-    _STATCROSSTAB._serialized_end = 8783
-    _STATCOV._serialized_start = 8785
-    _STATCOV._serialized_end = 8881
-    _STATCORR._serialized_start = 8884
-    _STATCORR._serialized_end = 9021
-    _STATAPPROXQUANTILE._serialized_start = 9024
-    _STATAPPROXQUANTILE._serialized_end = 9188
-    _STATFREQITEMS._serialized_start = 9190
-    _STATFREQITEMS._serialized_end = 9315
-    _STATSAMPLEBY._serialized_start = 9318
-    _STATSAMPLEBY._serialized_end = 9627
-    _STATSAMPLEBY_FRACTION._serialized_start = 9519
-    _STATSAMPLEBY_FRACTION._serialized_end = 9618
-    _NAFILL._serialized_start = 9630
-    _NAFILL._serialized_end = 9764
-    _NADROP._serialized_start = 9767
-    _NADROP._serialized_end = 9901
-    _NAREPLACE._serialized_start = 9904
-    _NAREPLACE._serialized_end = 10200
-    _NAREPLACE_REPLACEMENT._serialized_start = 10059
-    _NAREPLACE_REPLACEMENT._serialized_end = 10200
-    _TODF._serialized_start = 10202
-    _TODF._serialized_end = 10290
-    _WITHCOLUMNSRENAMED._serialized_start = 10293
-    _WITHCOLUMNSRENAMED._serialized_end = 10532
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10465
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10532
-    _WITHCOLUMNS._serialized_start = 10534
-    _WITHCOLUMNS._serialized_end = 10653
-    _WITHWATERMARK._serialized_start = 10656
-    _WITHWATERMARK._serialized_end = 10790
-    _HINT._serialized_start = 10793
-    _HINT._serialized_end = 10925
-    _UNPIVOT._serialized_start = 10928
-    _UNPIVOT._serialized_end = 11255
-    _UNPIVOT_VALUES._serialized_start = 11185
-    _UNPIVOT_VALUES._serialized_end = 11244
-    _TOSCHEMA._serialized_start = 11257
-    _TOSCHEMA._serialized_end = 11363
-    _REPARTITIONBYEXPRESSION._serialized_start = 11366
-    _REPARTITIONBYEXPRESSION._serialized_end = 11569
-    _MAPPARTITIONS._serialized_start = 11572
-    _MAPPARTITIONS._serialized_end = 11753
-    _GROUPMAP._serialized_start = 11756
-    _GROUPMAP._serialized_end = 12391
-    _COGROUPMAP._serialized_start = 12394
-    _COGROUPMAP._serialized_end = 12920
-    _APPLYINPANDASWITHSTATE._serialized_start = 12923
-    _APPLYINPANDASWITHSTATE._serialized_end = 13280
-    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13283
-    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 13527
-    _PYTHONUDTF._serialized_start = 13530
-    _PYTHONUDTF._serialized_end = 13707
-    _COLLECTMETRICS._serialized_start = 13710
-    _COLLECTMETRICS._serialized_end = 13846
-    _PARSE._serialized_start = 13849
-    _PARSE._serialized_end = 14237
-    _PARSE_OPTIONSENTRY._serialized_start = 4234
-    _PARSE_OPTIONSENTRY._serialized_end = 4292
-    _PARSE_PARSEFORMAT._serialized_start = 14138
-    _PARSE_PARSEFORMAT._serialized_end = 14226
+    _RELATION._serialized_end = 3391
+    _UNKNOWN._serialized_start = 3393
+    _UNKNOWN._serialized_end = 3402
+    _RELATIONCOMMON._serialized_start = 3404
+    _RELATIONCOMMON._serialized_end = 3495
+    _SQL._serialized_start = 3498
+    _SQL._serialized_end = 3976
+    _SQL_ARGSENTRY._serialized_start = 3792
+    _SQL_ARGSENTRY._serialized_end = 3882
+    _SQL_NAMEDARGUMENTSENTRY._serialized_start = 3884
+    _SQL_NAMEDARGUMENTSENTRY._serialized_end = 3976
+    _READ._serialized_start = 3979
+    _READ._serialized_end = 4642
+    _READ_NAMEDTABLE._serialized_start = 4157
+    _READ_NAMEDTABLE._serialized_end = 4349
+    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4291
+    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4349
+    _READ_DATASOURCE._serialized_start = 4352
+    _READ_DATASOURCE._serialized_end = 4629
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4291
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4349
+    _PROJECT._serialized_start = 4644
+    _PROJECT._serialized_end = 4761
+    _FILTER._serialized_start = 4763
+    _FILTER._serialized_end = 4875
+    _JOIN._serialized_start = 4878
+    _JOIN._serialized_end = 5539
+    _JOIN_JOINDATATYPE._serialized_start = 5217
+    _JOIN_JOINDATATYPE._serialized_end = 5309
+    _JOIN_JOINTYPE._serialized_start = 5312
+    _JOIN_JOINTYPE._serialized_end = 5520
+    _SETOPERATION._serialized_start = 5542
+    _SETOPERATION._serialized_end = 6021
+    _SETOPERATION_SETOPTYPE._serialized_start = 5858
+    _SETOPERATION_SETOPTYPE._serialized_end = 5972
+    _LIMIT._serialized_start = 6023
+    _LIMIT._serialized_end = 6099
+    _OFFSET._serialized_start = 6101
+    _OFFSET._serialized_end = 6180
+    _TAIL._serialized_start = 6182
+    _TAIL._serialized_end = 6257
+    _AGGREGATE._serialized_start = 6260
+    _AGGREGATE._serialized_end = 6842
+    _AGGREGATE_PIVOT._serialized_start = 6599
+    _AGGREGATE_PIVOT._serialized_end = 6710
+    _AGGREGATE_GROUPTYPE._serialized_start = 6713
+    _AGGREGATE_GROUPTYPE._serialized_end = 6842
+    _SORT._serialized_start = 6845
+    _SORT._serialized_end = 7005
+    _DROP._serialized_start = 7008
+    _DROP._serialized_end = 7149
+    _DEDUPLICATE._serialized_start = 7152
+    _DEDUPLICATE._serialized_end = 7392
+    _LOCALRELATION._serialized_start = 7394
+    _LOCALRELATION._serialized_end = 7483
+    _CACHEDLOCALRELATION._serialized_start = 7485
+    _CACHEDLOCALRELATION._serialized_end = 7557
+    _CACHEDREMOTERELATION._serialized_start = 7559
+    _CACHEDREMOTERELATION._serialized_end = 7614
+    _SAMPLE._serialized_start = 7617
+    _SAMPLE._serialized_end = 7890
+    _RANGE._serialized_start = 7893
+    _RANGE._serialized_end = 8038
+    _SUBQUERYALIAS._serialized_start = 8040
+    _SUBQUERYALIAS._serialized_end = 8154
+    _REPARTITION._serialized_start = 8157
+    _REPARTITION._serialized_end = 8299
+    _SHOWSTRING._serialized_start = 8302
+    _SHOWSTRING._serialized_end = 8444
+    _HTMLSTRING._serialized_start = 8446
+    _HTMLSTRING._serialized_end = 8560
+    _STATSUMMARY._serialized_start = 8562
+    _STATSUMMARY._serialized_end = 8654
+    _STATDESCRIBE._serialized_start = 8656
+    _STATDESCRIBE._serialized_end = 8737
+    _STATCROSSTAB._serialized_start = 8739
+    _STATCROSSTAB._serialized_end = 8840
+    _STATCOV._serialized_start = 8842
+    _STATCOV._serialized_end = 8938
+    _STATCORR._serialized_start = 8941
+    _STATCORR._serialized_end = 9078
+    _STATAPPROXQUANTILE._serialized_start = 9081
+    _STATAPPROXQUANTILE._serialized_end = 9245
+    _STATFREQITEMS._serialized_start = 9247
+    _STATFREQITEMS._serialized_end = 9372
+    _STATSAMPLEBY._serialized_start = 9375
+    _STATSAMPLEBY._serialized_end = 9684
+    _STATSAMPLEBY_FRACTION._serialized_start = 9576
+    _STATSAMPLEBY_FRACTION._serialized_end = 9675
+    _NAFILL._serialized_start = 9687
+    _NAFILL._serialized_end = 9821
+    _NADROP._serialized_start = 9824
+    _NADROP._serialized_end = 9958
+    _NAREPLACE._serialized_start = 9961
+    _NAREPLACE._serialized_end = 10257
+    _NAREPLACE_REPLACEMENT._serialized_start = 10116
+    _NAREPLACE_REPLACEMENT._serialized_end = 10257
+    _TODF._serialized_start = 10259
+    _TODF._serialized_end = 10347
+    _WITHCOLUMNSRENAMED._serialized_start = 10350
+    _WITHCOLUMNSRENAMED._serialized_end = 10589
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10522
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10589
+    _WITHCOLUMNS._serialized_start = 10591
+    _WITHCOLUMNS._serialized_end = 10710
+    _WITHWATERMARK._serialized_start = 10713
+    _WITHWATERMARK._serialized_end = 10847
+    _HINT._serialized_start = 10850
+    _HINT._serialized_end = 10982
+    _UNPIVOT._serialized_start = 10985
+    _UNPIVOT._serialized_end = 11312
+    _UNPIVOT_VALUES._serialized_start = 11242
+    _UNPIVOT_VALUES._serialized_end = 11301
+    _TOSCHEMA._serialized_start = 11314
+    _TOSCHEMA._serialized_end = 11420
+    _REPARTITIONBYEXPRESSION._serialized_start = 11423
+    _REPARTITIONBYEXPRESSION._serialized_end = 11626
+    _MAPPARTITIONS._serialized_start = 11629
+    _MAPPARTITIONS._serialized_end = 11810
+    _GROUPMAP._serialized_start = 11813
+    _GROUPMAP._serialized_end = 12448
+    _COGROUPMAP._serialized_start = 12451
+    _COGROUPMAP._serialized_end = 12977
+    _APPLYINPANDASWITHSTATE._serialized_start = 12980
+    _APPLYINPANDASWITHSTATE._serialized_end = 13337
+    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13340
+    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 13584
+    _PYTHONUDTF._serialized_start = 13587
+    _PYTHONUDTF._serialized_end = 13764
+    _COLLECTMETRICS._serialized_start = 13767
+    _COLLECTMETRICS._serialized_end = 13903
+    _PARSE._serialized_start = 13906
+    _PARSE._serialized_end = 14294
+    _PARSE_OPTIONSENTRY._serialized_start = 4291
+    _PARSE_OPTIONSENTRY._serialized_end = 4349
+    _PARSE_PARSEFORMAT._serialized_start = 14195
+    _PARSE_PARSEFORMAT._serialized_end = 14283
+    _ASOFJOIN._serialized_start = 14297
+    _ASOFJOIN._serialized_end = 14772
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index fb4a3661764..5bca4f21b2e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -100,6 +100,7 @@ class Relation(google.protobuf.message.Message):
     CACHED_LOCAL_RELATION_FIELD_NUMBER: builtins.int
     CACHED_REMOTE_RELATION_FIELD_NUMBER: builtins.int
     COMMON_INLINE_USER_DEFINED_TABLE_FUNCTION_FIELD_NUMBER: builtins.int
+    AS_OF_JOIN_FIELD_NUMBER: builtins.int
     FILL_NA_FIELD_NUMBER: builtins.int
     DROP_NA_FIELD_NUMBER: builtins.int
     REPLACE_FIELD_NUMBER: builtins.int
@@ -193,6 +194,8 @@ class Relation(google.protobuf.message.Message):
         self,
     ) -> global___CommonInlineUserDefinedTableFunction: ...
     @property
+    def as_of_join(self) -> global___AsOfJoin: ...
+    @property
     def fill_na(self) -> global___NAFill:
         """NA functions"""
     @property
@@ -268,6 +271,7 @@ class Relation(google.protobuf.message.Message):
         cached_remote_relation: global___CachedRemoteRelation | None = ...,
         common_inline_user_defined_table_function: 
global___CommonInlineUserDefinedTableFunction
         | None = ...,
+        as_of_join: global___AsOfJoin | None = ...,
         fill_na: global___NAFill | None = ...,
         drop_na: global___NADrop | None = ...,
         replace: global___NAReplace | None = ...,
@@ -292,6 +296,8 @@ class Relation(google.protobuf.message.Message):
             b"apply_in_pandas_with_state",
             "approx_quantile",
             b"approx_quantile",
+            "as_of_join",
+            b"as_of_join",
             "cached_local_relation",
             b"cached_local_relation",
             "cached_remote_relation",
@@ -403,6 +409,8 @@ class Relation(google.protobuf.message.Message):
             b"apply_in_pandas_with_state",
             "approx_quantile",
             b"approx_quantile",
+            "as_of_join",
+            b"as_of_join",
             "cached_local_relation",
             b"cached_local_relation",
             "cached_remote_relation",
@@ -546,6 +554,7 @@ class Relation(google.protobuf.message.Message):
             "cached_local_relation",
             "cached_remote_relation",
             "common_inline_user_defined_table_function",
+            "as_of_join",
             "fill_na",
             "drop_na",
             "replace",
@@ -3672,3 +3681,117 @@ class Parse(google.protobuf.message.Message):
     ) -> typing_extensions.Literal["schema"] | None: ...
 
 global___Parse = Parse
+
+class AsOfJoin(google.protobuf.message.Message):
+    """Relation of type [[AsOfJoin]].
+
+    `left` and `right` must be present.
+    """
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    LEFT_FIELD_NUMBER: builtins.int
+    RIGHT_FIELD_NUMBER: builtins.int
+    LEFT_AS_OF_FIELD_NUMBER: builtins.int
+    RIGHT_AS_OF_FIELD_NUMBER: builtins.int
+    JOIN_EXPR_FIELD_NUMBER: builtins.int
+    USING_COLUMNS_FIELD_NUMBER: builtins.int
+    JOIN_TYPE_FIELD_NUMBER: builtins.int
+    TOLERANCE_FIELD_NUMBER: builtins.int
+    ALLOW_EXACT_MATCHES_FIELD_NUMBER: builtins.int
+    DIRECTION_FIELD_NUMBER: builtins.int
+    @property
+    def left(self) -> global___Relation:
+        """(Required) Left input relation for a Join."""
+    @property
+    def right(self) -> global___Relation:
+        """(Required) Right input relation for a Join."""
+    @property
+    def left_as_of(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+        """(Required) Field to join on in left DataFrame"""
+    @property
+    def right_as_of(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+        """(Required) Field to join on in right DataFrame"""
+    @property
+    def join_expr(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+        """(Optional) The join condition. Could be unset when `using_columns` 
is utilized.
+
+        This field does not co-exist with using_columns.
+        """
+    @property
+    def using_columns(
+        self,
+    ) -> 
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+        """Optional. using_columns provides a list of columns that should 
present on both sides of
+        the join inputs that this Join will join on. For example A JOIN B 
USING col_name is
+        equivalent to A JOIN B on A.col_name = B.col_name.
+
+        This field does not co-exist with join_condition.
+        """
+    join_type: builtins.str
+    """(Required) The join type."""
+    @property
+    def tolerance(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+        """(Optional) The asof tolerance within this range."""
+    allow_exact_matches: builtins.bool
+    """(Required) Whether allow matching with the same value or not."""
+    direction: builtins.str
+    """(Required) Whether to search for prior, subsequent, or closest 
matches."""
+    def __init__(
+        self,
+        *,
+        left: global___Relation | None = ...,
+        right: global___Relation | None = ...,
+        left_as_of: pyspark.sql.connect.proto.expressions_pb2.Expression | 
None = ...,
+        right_as_of: pyspark.sql.connect.proto.expressions_pb2.Expression | 
None = ...,
+        join_expr: pyspark.sql.connect.proto.expressions_pb2.Expression | None 
= ...,
+        using_columns: collections.abc.Iterable[builtins.str] | None = ...,
+        join_type: builtins.str = ...,
+        tolerance: pyspark.sql.connect.proto.expressions_pb2.Expression | None 
= ...,
+        allow_exact_matches: builtins.bool = ...,
+        direction: builtins.str = ...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "join_expr",
+            b"join_expr",
+            "left",
+            b"left",
+            "left_as_of",
+            b"left_as_of",
+            "right",
+            b"right",
+            "right_as_of",
+            b"right_as_of",
+            "tolerance",
+            b"tolerance",
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "allow_exact_matches",
+            b"allow_exact_matches",
+            "direction",
+            b"direction",
+            "join_expr",
+            b"join_expr",
+            "join_type",
+            b"join_type",
+            "left",
+            b"left",
+            "left_as_of",
+            b"left_as_of",
+            "right",
+            b"right",
+            "right_as_of",
+            b"right_as_of",
+            "tolerance",
+            b"tolerance",
+            "using_columns",
+            b"using_columns",
+        ],
+    ) -> None: ...
+
+global___AsOfJoin = AsOfJoin
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index bcdae5e40b9..51f18e7b6f3 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2751,6 +2751,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         This is similar to a left-join except that we match on the nearest
         key rather than equal keys.
 
+        .. versionchanged:: 4.0.0
+            Supports Spark Connect.
+
         Parameters
         ----------
         other : :class:`DataFrame`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to