This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7c741c8c25fc [SPARK-46265][CONNECT] Assertions in AddArtifact RPC make the connect client incompatible with older clusters 7c741c8c25fc is described below commit 7c741c8c25fc4fe3d7d5fa5d490bb9b08debd952 Author: Niranjan Jayakar <n...@databricks.com> AuthorDate: Tue Dec 5 12:43:48 2023 -0400 [SPARK-46265][CONNECT] Assertions in AddArtifact RPC make the connect client incompatible with older clusters ### What changes were proposed in this pull request? A previous commit - d9c5f9d6 - updated the response of the `AddArtifact` RPC to return the session id. Further, it also added an assertion to the client of this RPC asserting that the session id returned by the server matches the session id that was requested. However, we will have the case that a connect client with this new assertion included may connect to a Spark cluster that does not yet have this change, making the change backwards incompatible. Loosen the assertion to allow empty session id in the RPC's response. ### Why are the changes needed? Newer connect clients can connect with older Spark clusters. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually started the a connect cluster using `spark-connect-shell` and using the connect client shell via `spark-connect-scala-client`. Changed the service to not return session id and verified the fix. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44181 from nija-at/artifact-sessionid. Authored-by: Niranjan Jayakar <n...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../org/apache/spark/sql/connect/client/ArtifactManager.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index 7a6eb963cb33..3cd35803d1ec 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -125,7 +125,9 @@ class ArtifactManager( .addAllNames(Arrays.asList(artifactName)) .build() val response = bstub.artifactStatus(request) - if (response.getSessionId != sessionId) { + if (StringUtils.isNotEmpty(response.getSessionId) && response.getSessionId != sessionId) { + // In older versions of the Spark cluster, the session ID is not set in the response. + // Ignore this check to keep compatibility. throw new IllegalStateException( s"Session ID mismatch: $sessionId != ${response.getSessionId}") } @@ -185,7 +187,9 @@ class ArtifactManager( val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { private val summaries = mutable.Buffer.empty[ArtifactSummary] override def onNext(v: AddArtifactsResponse): Unit = { - if (v.getSessionId != sessionId) { + if (StringUtils.isNotEmpty(v.getSessionId) && v.getSessionId != sessionId) { + // In older versions of the Spark cluster, the session ID is not set in the response. + // Ignore this check to keep compatibility. throw new IllegalStateException(s"Session ID mismatch: $sessionId != ${v.getSessionId}") } v.getArtifactsList.forEach { summary => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org