This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c741c8c25fc [SPARK-46265][CONNECT] Assertions in AddArtifact RPC make 
the connect client incompatible with older clusters
7c741c8c25fc is described below

commit 7c741c8c25fc4fe3d7d5fa5d490bb9b08debd952
Author: Niranjan Jayakar <n...@databricks.com>
AuthorDate: Tue Dec 5 12:43:48 2023 -0400

    [SPARK-46265][CONNECT] Assertions in AddArtifact RPC make the connect 
client incompatible with older clusters
    
    ### What changes were proposed in this pull request?
    
    A previous commit - d9c5f9d6 - updated the response of the
    `AddArtifact` RPC to return the session id. Further, it also added
    an assertion to the client of this RPC asserting that the session id
    returned by the server matches the session id that was requested.
    
    However, we will have the case that a connect client with this new
    assertion included may connect to a Spark cluster that does not yet
    have this change, making the change backwards incompatible.
    
    Loosen the assertion to allow empty session id in the RPC's response.
    
    ### Why are the changes needed?
    
    Newer connect clients can connect with older Spark clusters.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually started the a connect cluster using `spark-connect-shell`
    and using the connect client shell via `spark-connect-scala-client`.
    Changed the service to not return session id and verified the fix.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44181 from nija-at/artifact-sessionid.
    
    Authored-by: Niranjan Jayakar <n...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../org/apache/spark/sql/connect/client/ArtifactManager.scala     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 7a6eb963cb33..3cd35803d1ec 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -125,7 +125,9 @@ class ArtifactManager(
       .addAllNames(Arrays.asList(artifactName))
       .build()
     val response = bstub.artifactStatus(request)
-    if (response.getSessionId != sessionId) {
+    if (StringUtils.isNotEmpty(response.getSessionId) && response.getSessionId 
!= sessionId) {
+      // In older versions of the Spark cluster, the session ID is not set in 
the response.
+      // Ignore this check to keep compatibility.
       throw new IllegalStateException(
         s"Session ID mismatch: $sessionId != ${response.getSessionId}")
     }
@@ -185,7 +187,9 @@ class ArtifactManager(
     val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
       private val summaries = mutable.Buffer.empty[ArtifactSummary]
       override def onNext(v: AddArtifactsResponse): Unit = {
-        if (v.getSessionId != sessionId) {
+        if (StringUtils.isNotEmpty(v.getSessionId) && v.getSessionId != 
sessionId) {
+          // In older versions of the Spark cluster, the session ID is not set 
in the response.
+          // Ignore this check to keep compatibility.
           throw new IllegalStateException(s"Session ID mismatch: $sessionId != 
${v.getSessionId}")
         }
         v.getArtifactsList.forEach { summary =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to