This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 1d43683c5f0 [SPARK-44637][CONNECT] Synchronize accesses to 
ExecuteResponseObserver
1d43683c5f0 is described below

commit 1d43683c5f0ad1aed25cfd9d4361fed866b3d1af
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Wed Aug 2 15:07:12 2023 -0400

    [SPARK-44637][CONNECT] Synchronize accesses to ExecuteResponseObserver
    
    ### What changes were proposed in this pull request?
    
    Function `removeResponsesUntilId` is used by `ReleaseExecute` RPC, and that 
needs to be synchronized against `removeCachedResponses` running from 
`consumeResponse` for `ExecutePlan` or `ReattachExecute` RPC.
    
    In general, all public accesses to ExecuteResponseObserver should best be 
synchronized.
    
    ### Why are the changes needed?
    
    Fix synchronization bug caught by testing of python client.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Caught in https://github.com/apache/spark/pull/42235, but want to fix 
separately because this is a server side change.
    
    Closes #42299 from juliuszsompolski/SPARK-44637.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 26c7e55f19993ef265b8730503c1ffa4ee697347)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../apache/spark/sql/connect/execution/ExecuteResponseObserver.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 5966e6cf0fc..8af0f72b8da 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -179,7 +179,7 @@ private[connect] class ExecuteResponseObserver[T <: 
MessageLite](val executeHold
   }
 
   /** Get the index in the stream for given response id. */
-  def getResponseIndexById(responseId: String): Long = {
+  def getResponseIndexById(responseId: String): Long = synchronized {
     responseIdToIndex.getOrElse(
       responseId,
       throw new SparkSQLException(
@@ -188,7 +188,7 @@ private[connect] class ExecuteResponseObserver[T <: 
MessageLite](val executeHold
   }
 
   /** Remove cached responses up to and including response with given id. */
-  def removeResponsesUntilId(responseId: String): Unit = {
+  def removeResponsesUntilId(responseId: String): Unit = synchronized {
     val index = getResponseIndexById(responseId)
     removeResponsesUntilIndex(index)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to