This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 74f6abef94c [SPARK-44625][CONNECT] SparkConnectExecutionManager to 
track all executions
74f6abef94c is described below

commit 74f6abef94c4cfb12fe36f8050138780bd669652
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Aug 11 18:32:07 2023 +0200

    [SPARK-44625][CONNECT] SparkConnectExecutionManager to track all executions
    
    ### What changes were proposed in this pull request?
    
    SparkConnectExecutionManager tracks all executions (ExecuteHolder) in all 
sessions of Spark Connect. It tracks which executions have RPCs 
(ExecuteGrpcReponseSender) attached to them. If an execution gets abandoned 
(it's not cleared with ReleaseExecute by the client, but no new RPC arrives), 
it will be automatically interrupted and removed after a timeout.
    
    Note for the failure:
    ```
    Error: Field "2" on message "ReleaseExecuteResponse" moved from outside to 
inside a oneof.
    Error: buf found 1 breaking changes.
    ```
    The message ReleaseExecuteResponse has not been released yet, so it's not a 
breaking change compared to any released version.
    
    ### Why are the changes needed?
    
    Need the SparkConnectExecutionManager to track reattachable executions that 
got abandoned by client.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pending, but the SparkConnectExecutionManager allows tests to inspect 
execution state, so allows writing more unit tests about reattachable execution.
    
    Closes #42423 from juliuszsompolski/SPARK-44625.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 84dbe846b30d5250169b834b182779a104570888)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |   5 +
 .../src/main/protobuf/spark/connect/base.proto     |   8 +-
 .../apache/spark/sql/connect/config/Connect.scala  |  24 +++
 .../execution/ExecuteGrpcResponseSender.scala      | 119 ++++++------
 .../execution/ExecuteResponseObserver.scala        |   9 +-
 .../spark/sql/connect/service/ExecuteHolder.scala  | 143 +++++++++++---
 .../spark/sql/connect/service/SessionHolder.scala  |  39 ++--
 .../service/SparkConnectExecutePlanHandler.scala   |  19 +-
 .../service/SparkConnectExecutionManager.scala     | 209 +++++++++++++++++++++
 .../SparkConnectReattachExecuteHandler.scala       |  31 ++-
 .../SparkConnectReleaseExecuteHandler.scala        |  51 ++---
 .../sql/connect/service/SparkConnectService.scala  |  11 ++
 .../spark/sql/connect/utils/ErrorUtils.scala       |  15 +-
 .../connect/planner/SparkConnectPlannerSuite.scala |   5 +-
 ...-error-conditions-invalid-handle-error-class.md |   4 +
 python/pyspark/sql/connect/proto/base_pb2.py       |   8 +-
 python/pyspark/sql/connect/proto/base_pb2.pyi      |  23 ++-
 python/pyspark/sql/connect/proto/base_pb2_grpc.py  |   2 +-
 18 files changed, 566 insertions(+), 159 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index d9d1963c958..74542f2b914 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1426,6 +1426,11 @@
           "Handle must be an UUID string of the format 
'00112233-4455-6677-8899-aabbccddeeff'"
         ]
       },
+      "OPERATION_ABANDONED" : {
+        "message" : [
+          "Operation was considered abandoned because of inactivity and 
removed."
+        ]
+      },
       "OPERATION_ALREADY_EXISTS" : {
         "message" : [
           "Operation already exists."
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 79dbadba5bb..65e2493f836 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -772,8 +772,10 @@ message ReleaseExecuteResponse {
   // Session id in which the release was running.
   string session_id = 1;
 
-  // Operation id of the operation which the release concerns.
-  string operation_id = 2;
+  // Operation id of the operation on which the release executed.
+  // If the operation couldn't be found (because e.g. it was concurrently 
released), will be unset.
+  // Otherwise, it will be equal to the operation_id from request.
+  optional string operation_id = 2;
 }
 
 // Main interface for the SparkConnect service.
@@ -809,7 +811,7 @@ service SparkConnectService {
   // Release an reattachable execution, or parts thereof.
   // The ExecutePlan must have been started with 
ReattachOptions.reattachable=true.
   // Non reattachable executions are released automatically and immediately 
after the ExecutePlan
-  // RPC and ReleaseExecute doesn't need to be used.
+  // RPC and ReleaseExecute may not be used.
   rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {}
 }
 
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 51a9a1bf951..054ccbe6707 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -68,6 +68,30 @@ object Connect {
       .intConf
       .createWithDefault(1024)
 
+  val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT =
+    ConfigBuilder("spark.connect.execute.manager.detachedTimeout")
+      .internal()
+      .doc("Timeout after which executions without an attached RPC will be 
removed.")
+      .version("3.5.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("5m")
+
+  val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL =
+    ConfigBuilder("spark.connect.execute.manager.maintenanceInterval")
+      .internal()
+      .doc("Interval at which execution manager will search for abandoned 
executions to remove.")
+      .version("3.5.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("30s")
+
+  val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE =
+    ConfigBuilder("spark.connect.execute.manager.abandonedTombstonesSize")
+      .internal()
+      .doc("Maximum size of the cache of abandoned executions.")
+      .version("3.5.0")
+      .intConf
+      .createWithDefaultString("10000")
+
   val CONNECT_EXECUTE_REATTACHABLE_ENABLED =
     ConfigBuilder("spark.connect.execute.reattachable.enabled")
       .internal()
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 7b51a90ca37..6b8fcde1156 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -24,7 +24,8 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connect.common.ProtoUtils
 import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION,
 CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE}
-import org.apache.spark.sql.connect.service.ExecuteHolder
+import org.apache.spark.sql.connect.service.{ExecuteHolder, 
SparkConnectService}
+import org.apache.spark.sql.connect.utils.ErrorUtils
 
 /**
  * ExecuteGrpcResponseSender sends responses to the GRPC stream. It consumes 
responses from
@@ -37,12 +38,14 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
 private[connect] class ExecuteGrpcResponseSender[T <: Message](
     val executeHolder: ExecuteHolder,
     grpcObserver: StreamObserver[T])
-    extends Logging {
+    extends Logging { self =>
 
+  // the executionObserver object is used as a synchronization lock between the
+  // ExecuteGrpcResponseSender consumer and ExecuteResponseObserver producer.
   private var executionObserver = executeHolder.responseObserver
     .asInstanceOf[ExecuteResponseObserver[T]]
 
-  private var detached = false
+  private var interrupted = false
 
   // Signal to wake up when grpcCallObserver.isReady()
   private val grpcCallObserverReadySignal = new Object
@@ -51,42 +54,50 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
   private var consumeSleep = 0L
   private var sendSleep = 0L
 
+  // Thread handling the processing, in case it's done in the background.
+  private var backgroundThread: Option[Thread] = None
+
   /**
-   * Detach this sender from executionObserver. Called only from 
executionObserver that this
-   * sender is attached to. Lock on executionObserver is held, and notifyAll 
will wake up this
-   * sender if sleeping.
+   * Interrupt this sender and make it exit.
    */
-  def detach(): Unit = executionObserver.synchronized {
-    if (detached == true) {
-      throw new IllegalStateException("ExecuteGrpcResponseSender already 
detached!")
-    }
-    detached = true
+  def interrupt(): Unit = executionObserver.synchronized {
+    interrupted = true
     executionObserver.notifyAll()
   }
 
   def run(lastConsumedStreamIndex: Long): Unit = {
     if (executeHolder.reattachable) {
-      // In reattachable execution, check if grpcObserver is ready for 
sending, by using
-      // setOnReadyHandler of the ServerCallStreamObserver. Otherwise, calling 
grpcObserver.onNext
-      // can queue the responses without sending them, and it is unknown how 
far behind it is, and
-      // hence how much the executionObserver needs to buffer.
+      // In reattachable execution we use setOnReadyHandler and 
grpcCallObserver.isReady to control
+      // backpressure. See sendResponse.
       //
-      // Because OnReady events get queued on the same GRPC inboud queue as 
the executePlan or
-      // reattachExecute RPC handler that this is executing in, OnReady events 
will not arrive and
-      // not trigger the OnReadyHandler unless this thread returns from 
executePlan/reattachExecute.
+      // Because calls to OnReadyHandler get queued on the same GRPC inboud 
queue as the executePlan
+      // or reattachExecute RPC handler that this is executing in, they will 
not arrive and not
+      // trigger the OnReadyHandler unless this thread returns from 
executePlan/reattachExecute.
       // Therefore, we launch another thread to operate on the grpcObserver 
and send the responses,
       // while this thread will exit from the executePlan/reattachExecute 
call, allowing GRPC
       // to send the OnReady events.
       // See https://github.com/grpc/grpc-java/issues/7361
 
-      val t = new Thread(
-        s"SparkConnectGRPCSender_" +
-          
s"opId=${executeHolder.operationId}_startIndex=$lastConsumedStreamIndex") {
-        override def run(): Unit = {
-          execute(lastConsumedStreamIndex)
-        }
-      }
-      executeHolder.grpcSenderThreads += t
+      backgroundThread = Some(
+        new Thread(
+          s"SparkConnectGRPCSender_" +
+            
s"opId=${executeHolder.operationId}_startIndex=$lastConsumedStreamIndex") {
+          override def run(): Unit = {
+            try {
+              execute(lastConsumedStreamIndex)
+            } catch {
+              // This is executing in it's own thread, so need to handle RPC 
error like the
+              // SparkConnectService handlers do.
+              ErrorUtils.handleError(
+                "async-grpc-response-sender",
+                observer = grpcObserver,
+                userId = executeHolder.request.getUserContext.getUserId,
+                sessionId = executeHolder.request.getSessionId)
+            } finally {
+              executeHolder.removeGrpcResponseSender(self)
+            }
+          }
+        })
 
       val grpcCallObserver = 
grpcObserver.asInstanceOf[ServerCallStreamObserver[T]]
       grpcCallObserver.setOnReadyHandler(() => {
@@ -97,16 +108,17 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       })
 
       // Start the thread and exit
-      t.start()
+      backgroundThread.foreach(_.start())
     } else {
       // Non reattachable execute runs directly in the GRPC thread.
       try {
         execute(lastConsumedStreamIndex)
       } finally {
+        executeHolder.removeGrpcResponseSender(this)
         if (!executeHolder.reattachable) {
           // Non reattachable executions release here immediately.
           // (Reattachable executions release with ReleaseExecute RPC.)
-          executeHolder.close()
+          
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
         }
       }
     }
@@ -159,9 +171,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
     while (!finished) {
       var response: Option[CachedStreamResponse[T]] = None
 
-      // Conditions for exiting the inner loop:
-      // 1. was detached from response observer
-      def detachedFromObserver = detached
+      // Conditions for exiting the inner loop (and helpers to compute them):
+      // 1. was interrupted
       // 2. has a response to send
       def gotResponse = response.nonEmpty
       // 3. sent everything from the stream and the stream is finished
@@ -170,24 +181,21 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       def deadlineLimitReached =
         sentResponsesSize > maximumResponseSize || deadlineTimeMillis < 
System.currentTimeMillis()
 
-      // Get next available response.
-      // Wait until either this sender got detached or next response is ready,
-      // or the stream is complete and it had already sent all responses.
       logTrace(s"Trying to get next response with index=$nextIndex.")
       executionObserver.synchronized {
         logTrace(s"Acquired executionObserver lock.")
         val sleepStart = System.nanoTime()
         var sleepEnd = 0L
-        while (!detachedFromObserver &&
+        while (!interrupted &&
           !gotResponse &&
           !streamFinished &&
           !deadlineLimitReached) {
           logTrace(s"Try to get response with index=$nextIndex from observer.")
           response = executionObserver.consumeResponse(nextIndex)
           logTrace(s"Response index=$nextIndex from observer: 
${response.isDefined}")
-          // If response is empty, release executionObserver lock and wait to 
get notified.
-          // The state of detached, response and lastIndex are change under 
lock in
-          // executionObserver, and will notify upon state change.
+          // If response is empty, release executionObserver monitor and wait 
to get notified.
+          // The state of interrupted, response and lastIndex are changed 
under executionObserver
+          // monitor, and will notify upon state change.
           if (response.isEmpty) {
             val timeout = Math.max(1, deadlineTimeMillis - 
System.currentTimeMillis())
             logTrace(s"Wait for response to become available with 
timeout=$timeout ms.")
@@ -197,7 +205,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
           }
         }
         logTrace(
-          s"Exiting loop: detached=$detached, " +
+          s"Exiting loop: interrupted=$interrupted, " +
             s"response=${response.map(r => 
ProtoUtils.abbreviate(r.response))}, " +
             s"lastIndex=${executionObserver.getLastResponseIndex()}, " +
             s"deadline=${deadlineLimitReached}")
@@ -208,10 +216,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       }
 
       // Process the outcome of the inner loop.
-      if (detachedFromObserver) {
-        // This sender got detached by the observer.
-        // This only happens if this RPC is actually dead, and the client 
already came back with
-        // a ReattachExecute RPC. Kill this RPC.
+      if (interrupted) {
+        // This sender got interrupted. Kill this RPC.
         logWarning(
           s"Got detached from opId=${executeHolder.operationId} at index 
${nextIndex - 1}." +
             s"totalTime=${System.nanoTime - startTime}ns " +
@@ -256,12 +262,11 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
   }
 
   /**
-   * Send the response to the grpcCallObserver. In reattachable execution, we 
control the flow,
-   * and only pass the response to the grpcCallObserver when it's ready to 
send. Otherwise,
-   * grpcCallObserver.onNext() would return in a non-blocking way, but could 
queue responses
-   * without sending them if the client doesn't keep up receiving them. When 
pushing more
-   * responses to onNext(), there is no insight how far behind the service is 
in actually sending
-   * them out.
+   * Send the response to the grpcCallObserver.
+   *
+   * In reattachable execution, we control the backpressure and only send when 
the
+   * grpcCallObserver is in fact ready to send.
+   *
    * @param deadlineTimeMillis
    *   when reattachable, wait for ready stream until this deadline.
    * @return
@@ -278,12 +283,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       grpcObserver.onNext(response.response)
       true
     } else {
-      // In reattachable execution, we control the flow, and only pass the 
response to the
+      // In reattachable execution, we control the backpressure, and only pass 
the response to the
       // grpcCallObserver when it's ready to send.
       // Otherwise, grpcCallObserver.onNext() would return in a non-blocking 
way, but could queue
       // responses without sending them if the client doesn't keep up 
receiving them.
       // When pushing more responses to onNext(), there is no insight how far 
behind the service is
-      // in actually sending them out.
+      // in actually sending them out. See 
https://github.com/grpc/grpc-java/issues/1549
       // By sending responses only when grpcCallObserver.isReady(), we control 
that the actual
       // sending doesn't fall behind what we push from here.
       // By using the deadline, we exit the RPC if the responses aren't picked 
up by the client.
@@ -295,7 +300,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
         logTrace(s"Acquired grpcCallObserverReadySignal lock.")
         val sleepStart = System.nanoTime()
         var sleepEnd = 0L
-        while (!grpcCallObserver.isReady() && deadlineTimeMillis >= 
System.currentTimeMillis()) {
+        // Conditions for exiting the inner loop
+        // 1. was detached
+        // 2. grpcCallObserver is ready to send more data
+        // 3. time deadline is reached
+        while (!interrupted &&
+          !grpcCallObserver.isReady() &&
+          deadlineTimeMillis >= System.currentTimeMillis()) {
           val timeout = Math.max(1, deadlineTimeMillis - 
System.currentTimeMillis())
           var sleepStart = System.nanoTime()
           logTrace(s"Wait for grpcCallObserver to become ready with 
timeout=$timeout ms.")
@@ -303,7 +314,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
           logTrace(s"Reacquired grpcCallObserverReadySignal lock after 
waiting.")
           sleepEnd = System.nanoTime()
         }
-        if (grpcCallObserver.isReady()) {
+        if (!interrupted && grpcCallObserver.isReady()) {
           val sleepTime = if (sleepEnd > 0L) sleepEnd - sleepStart else 0L
           logDebug(
             s"SEND opId=${executeHolder.operationId} 
responseId=${response.responseId} " +
@@ -313,7 +324,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
           grpcCallObserver.onNext(response.response)
           true
         } else {
-          logTrace(s"grpcCallObserver is not ready, exiting.")
+          logTrace(s"exiting sendResponse without sending")
           false
         }
       }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 0573f7b3dae..d9db07fd228 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -148,8 +148,8 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
 
   /** Attach a new consumer (ExecuteResponseGRPCSender). */
   def attachConsumer(newSender: ExecuteGrpcResponseSender[T]): Unit = 
synchronized {
-    // detach the current sender before attaching new one
-    responseSender.foreach(_.detach())
+    // interrupt the current sender before attaching new one
+    responseSender.foreach(_.interrupt())
     responseSender = Some(newSender)
   }
 
@@ -241,11 +241,6 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
     finalProducedIndex.isDefined
   }
 
-  /** Consumer (ExecuteResponseGRPCSender) waits on the monitor of 
ExecuteResponseObserver. */
-  private def notifyConsumer(): Unit = {
-    notifyAll()
-  }
-
   /**
    * Remove cached responses after response with lastReturnedIndex is returned 
from getResponse.
    * Remove according to caching policy:
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
index 105af0dc0ba..bce07133392 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
@@ -84,19 +84,6 @@ private[connect] class ExecuteHolder(
     }
   }
 
-  /**
-   * True if there is currently an RPC (ExecutePlanRequest, ReattachExecute) 
attached to this
-   * execution.
-   */
-  var attached: Boolean = true
-
-  /**
-   * Threads that execute the ExecuteGrpcResponseSender and send the GRPC 
responses.
-   *
-   * TODO(SPARK-44625): Joining and cleaning up these threads during cleanup.
-   */
-  val grpcSenderThreads: mutable.ArrayBuffer[Thread] = new 
mutable.ArrayBuffer[Thread]()
-
   val responseObserver: ExecuteResponseObserver[proto.ExecutePlanResponse] =
     new ExecuteResponseObserver[proto.ExecutePlanResponse](this)
 
@@ -104,12 +91,35 @@ private[connect] class ExecuteHolder(
 
   private val runner: ExecuteThreadRunner = new ExecuteThreadRunner(this)
 
+  /** System.currentTimeMillis when this ExecuteHolder was created. */
+  val creationTime = System.currentTimeMillis()
+
+  /**
+   * None if there is currently an attached RPC (grpcResponseSenders not empty 
or during initial
+   * ExecutePlan handler). Otherwise, the System.currentTimeMillis when the 
last RPC detached
+   * (grpcResponseSenders became empty).
+   */
+  @volatile var lastAttachedRpcTime: Option[Long] = None
+
+  /** System.currentTimeMillis when this ExecuteHolder was closed. */
+  private var closedTime: Option[Long] = None
+
+  /**
+   * Attached ExecuteGrpcResponseSenders that send the GRPC responses.
+   *
+   * In most situations at most one, except network hang issues where 
temporarily there would be a
+   * stale one, before being interrupted by a new one in ReattachExecute.
+   */
+  private val grpcResponseSenders
+      : 
mutable.ArrayBuffer[ExecuteGrpcResponseSender[proto.ExecutePlanResponse]] =
+    new 
mutable.ArrayBuffer[ExecuteGrpcResponseSender[proto.ExecutePlanResponse]]()
+
   /**
    * Start the execution. The execution is started in a background thread in 
ExecuteThreadRunner.
    * Responses are produced and cached in ExecuteResponseObserver. A GRPC 
thread consumes the
    * responses by attaching an ExecuteGrpcResponseSender,
    * @see
-   *   attachAndRunGrpcResponseSender.
+   *   runGrpcResponseSender.
    */
   def start(): Unit = {
     runner.start()
@@ -128,8 +138,9 @@ private[connect] class ExecuteHolder(
    * @param responseSender
    *   the ExecuteGrpcResponseSender
    */
-  def attachAndRunGrpcResponseSender(
+  def runGrpcResponseSender(
       responseSender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse]): 
Unit = {
+    addGrpcResponseSender(responseSender)
     responseSender.run(0)
   }
 
@@ -142,13 +153,49 @@ private[connect] class ExecuteHolder(
    *   the last response that was already consumed. The sender will start from 
response after
    *   that.
    */
-  def attachAndRunGrpcResponseSender(
+  def runGrpcResponseSender(
       responseSender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse],
       lastConsumedResponseId: String): Unit = {
     val lastConsumedIndex = 
responseObserver.getResponseIndexById(lastConsumedResponseId)
+    addGrpcResponseSender(responseSender)
     responseSender.run(lastConsumedIndex)
   }
 
+  private def addGrpcResponseSender(
+      sender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse]) = 
synchronized {
+    if (closedTime.isEmpty) {
+      grpcResponseSenders += sender
+      lastAttachedRpcTime = None
+    } else {
+      // execution is closing... interrupt it already.
+      sender.interrupt()
+    }
+  }
+
+  def removeGrpcResponseSender[_](sender: ExecuteGrpcResponseSender[_]): Unit 
= synchronized {
+    // if closed, we are shutting down and interrupting all senders already
+    if (closedTime.isEmpty) {
+      grpcResponseSenders -=
+        
sender.asInstanceOf[ExecuteGrpcResponseSender[proto.ExecutePlanResponse]]
+      if (grpcResponseSenders.isEmpty) {
+        lastAttachedRpcTime = Some(System.currentTimeMillis())
+      }
+    }
+  }
+
+  /**
+   * For a short period in ExecutePlan after creation and until 
runGrpcResponseSender is called,
+   * there is no attached response sender, but yet we start with 
lastAttachedRpcTime = None, so we
+   * don't get garbage collected. End this grace period when the initial 
ExecutePlan ends.
+   */
+  def afterInitialRPC(): Unit = synchronized {
+    if (closedTime.isEmpty) {
+      if (grpcResponseSenders.isEmpty) {
+        lastAttachedRpcTime = Some(System.currentTimeMillis())
+      }
+    }
+  }
+
   /**
    * Remove cached responses from the response observer until and including 
the response with
    * given responseId.
@@ -168,15 +215,30 @@ private[connect] class ExecuteHolder(
   }
 
   /**
-   * Close the execution and remove it from the session. Note: it first 
interrupts the runner if
-   * it's still running, and it waits for it to finish.
+   * Interrupt (if still running) and close the execution.
+   *
+   * Called only by SparkConnectExecutionManager.removeExecuteHolder, which 
then also removes the
+   * execution from global tracking and from its session.
    */
-  def close(): Unit = {
-    runner.interrupt()
-    runner.join()
-    responseObserver.removeAll()
-    eventsManager.postClosed()
-    sessionHolder.removeExecuteHolder(operationId)
+  def close(): Unit = synchronized {
+    if (closedTime.isEmpty) {
+      // interrupt execution, if still running.
+      runner.interrupt()
+      // wait for execution to finish, to make sure no more results get pushed 
to responseObserver
+      runner.join()
+      // interrupt any attached grpcResponseSenders
+      grpcResponseSenders.foreach(_.interrupt())
+      // if there were still any grpcResponseSenders, register detach time
+      if (grpcResponseSenders.nonEmpty) {
+        lastAttachedRpcTime = Some(System.currentTimeMillis())
+        grpcResponseSenders.clear()
+      }
+      // remove all cached responses from observer
+      responseObserver.removeAll()
+      // post closed to UI
+      eventsManager.postClosed()
+      closedTime = Some(System.currentTimeMillis())
+    }
   }
 
   /**
@@ -187,6 +249,25 @@ private[connect] class ExecuteHolder(
     "SparkConnect_Execute_" +
       
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}"
   }
+
+  /** Get ExecuteInfo with information about this ExecuteHolder. */
+  def getExecuteInfo: ExecuteInfo = synchronized {
+    ExecuteInfo(
+      request = request,
+      userId = sessionHolder.userId,
+      sessionId = sessionHolder.sessionId,
+      operationId = operationId,
+      jobTag = jobTag,
+      sparkSessionTags = sparkSessionTags,
+      reattachable = reattachable,
+      status = eventsManager.status,
+      creationTime = creationTime,
+      lastAttachedRpcTime = lastAttachedRpcTime,
+      closedTime = closedTime)
+  }
+
+  /** Get key used by SparkConnectExecutionManager global tracker. */
+  def key: ExecuteKey = ExecuteKey(sessionHolder.userId, 
sessionHolder.sessionId, operationId)
 }
 
 /** Used to identify ExecuteHolder jobTag among SparkContext.SPARK_JOB_TAGS. */
@@ -221,3 +302,17 @@ object ExecuteSessionTag {
     if (sessionTag.startsWith(prefix)) Some(sessionTag) else None
   }
 }
+
+/** Information about an ExecuteHolder. */
+case class ExecuteInfo(
+    request: proto.ExecutePlanRequest,
+    userId: String,
+    sessionId: String,
+    operationId: String,
+    jobTag: String,
+    sparkSessionTags: Set[String],
+    reattachable: Boolean,
+    status: ExecuteStatus,
+    creationTime: Long,
+    lastAttachedRpcTime: Option[Long],
+    closedTime: Option[Long])
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 29134f0dc0d..b828d78710f 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -24,8 +24,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.{JobArtifactSet, SparkException, SparkSQLException}
-import org.apache.spark.connect.proto
+import org.apache.spark.{JobArtifactSet, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.SparkSession
@@ -42,7 +41,7 @@ import org.apache.spark.util.Utils
 case class SessionHolder(userId: String, sessionId: String, session: 
SparkSession)
     extends Logging {
 
-  val executions: ConcurrentMap[String, ExecuteHolder] =
+  private val executions: ConcurrentMap[String, ExecuteHolder] =
     new ConcurrentHashMap[String, ExecuteHolder]()
 
   val eventManager: SessionEventsManager = SessionEventsManager(this, new 
SystemClock())
@@ -56,23 +55,23 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
   private lazy val listenerCache: ConcurrentMap[String, 
StreamingQueryListener] =
     new ConcurrentHashMap()
 
-  private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): 
ExecuteHolder = {
-    val executeHolder = new ExecuteHolder(request, this)
+  /** Add ExecuteHolder to this session. Called only by 
SparkConnectExecutionManager. */
+  private[service] def addExecuteHolder(executeHolder: ExecuteHolder): Unit = {
     val oldExecute = executions.putIfAbsent(executeHolder.operationId, 
executeHolder)
     if (oldExecute != null) {
-      throw new SparkSQLException(
-        errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
-        messageParameters = Map("handle" -> executeHolder.operationId))
+      // the existance of this should alrady be checked by 
SparkConnectExecutionManager
+      throw new IllegalStateException(
+        s"ExecuteHolder with opId=${executeHolder.operationId} already 
exists!")
     }
-    executeHolder
   }
 
-  private[connect] def executeHolder(operationId: String): 
Option[ExecuteHolder] = {
-    Option(executions.get(operationId))
+  /** Remove ExecuteHolder to this session. Called only by 
SparkConnectExecutionManager. */
+  private[service] def removeExecuteHolder(operationId: String): Unit = {
+    executions.remove(operationId)
   }
 
-  private[connect] def removeExecuteHolder(operationId: String): Unit = {
-    executions.remove(operationId)
+  private[connect] def executeHolder(operationId: String): 
Option[ExecuteHolder] = {
+    Option(executions.get(operationId))
   }
 
   /**
@@ -80,7 +79,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
    * @return
    *   list of operationIds of interrupted executions
    */
-  private[connect] def interruptAll(): Seq[String] = {
+  private[service] def interruptAll(): Seq[String] = {
     val interruptedIds = new mutable.ArrayBuffer[String]()
     executions.asScala.values.foreach { execute =>
       if (execute.interrupt()) {
@@ -95,7 +94,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
    * @return
    *   list of operationIds of interrupted executions
    */
-  private[connect] def interruptTag(tag: String): Seq[String] = {
+  private[service] def interruptTag(tag: String): Seq[String] = {
     val interruptedIds = new mutable.ArrayBuffer[String]()
     executions.asScala.values.foreach { execute =>
       if (execute.sparkSessionTags.contains(tag)) {
@@ -112,7 +111,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
    * @return
    *   list of operationIds of interrupted executions (one element or empty)
    */
-  private[connect] def interruptOperation(operationId: String): Seq[String] = {
+  private[service] def interruptOperation(operationId: String): Seq[String] = {
     val interruptedIds = new mutable.ArrayBuffer[String]()
     Option(executions.get(operationId)).foreach { execute =>
       if (execute.interrupt()) {
@@ -252,6 +251,12 @@ object SessionHolder {
 
   /** Creates a dummy session holder for use in tests. */
   def forTesting(session: SparkSession): SessionHolder = {
-    SessionHolder(userId = "testUser", sessionId = UUID.randomUUID().toString, 
session = session)
+    val ret =
+      SessionHolder(
+        userId = "testUser",
+        sessionId = UUID.randomUUID().toString,
+        session = session)
+    SparkConnectService.putSessionForTesting(ret)
+    ret
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
index 9daf1e17b5e..1ab5f26f90b 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala
@@ -27,14 +27,15 @@ class SparkConnectExecutePlanHandler(responseObserver: 
StreamObserver[proto.Exec
     extends Logging {
 
   def handle(v: proto.ExecutePlanRequest): Unit = {
-    val sessionHolder = SparkConnectService
-      .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
-    val executeHolder = sessionHolder.createExecuteHolder(v)
-
-    executeHolder.eventsManager.postStarted()
-    executeHolder.start()
-    val responseSender =
-      new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
-    executeHolder.attachAndRunGrpcResponseSender(responseSender)
+    val executeHolder = 
SparkConnectService.executionManager.createExecuteHolder(v)
+    try {
+      executeHolder.eventsManager.postStarted()
+      executeHolder.start()
+      val responseSender =
+        new 
ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
+      executeHolder.runGrpcResponseSender(responseSender)
+    } finally {
+      executeHolder.afterInitialRPC()
+    }
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
new file mode 100644
index 00000000000..ce1f6c93f6c
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import com.google.common.cache.CacheBuilder
+
+import org.apache.spark.{SparkEnv, SparkSQLException}
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE,
 CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, 
CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
+
+// Unique key identifying execution by combination of user, session and 
operation id
+case class ExecuteKey(userId: String, sessionId: String, operationId: String)
+
+/**
+ * Global tracker of all ExecuteHolder executions.
+ *
+ * All ExecuteHolders are created, and removed through it. It keeps track of 
all the executions,
+ * and removes executions that have been abandoned.
+ */
+private[connect] class SparkConnectExecutionManager() extends Logging {
+
+  /** Hash table containing all current executions. Guarded by executionsLock. 
*/
+  private val executions: mutable.HashMap[ExecuteKey, ExecuteHolder] =
+    new mutable.HashMap[ExecuteKey, ExecuteHolder]()
+  private val executionsLock = new Object
+
+  /** Graveyard of tombstones of executions that were abandoned and removed. */
+  private val abandonedTombstones = CacheBuilder
+    .newBuilder()
+    
.maximumSize(SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE))
+    .build[ExecuteKey, ExecuteInfo]()
+
+  /** None if there are no executions. Otherwise, the time when the last 
execution was removed. */
+  private var lastExecutionTime: Option[Long] = 
Some(System.currentTimeMillis())
+
+  /** Executor for the periodic maintenance */
+  private var scheduledExecutor: Option[ScheduledExecutorService] = None
+
+  /**
+   * Create a new ExecuteHolder and register it with this global manager and 
with its session.
+   */
+  private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): 
ExecuteHolder = {
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getSessionId)
+    val executeHolder = new ExecuteHolder(request, sessionHolder)
+    executionsLock.synchronized {
+      // Check if the operation already exists, both in active executions, and 
in the graveyard
+      // of tombstones of executions that have been abandoned.
+      // The latter is to prevent double execution when a client retries 
execution, thinking it
+      // never reached the server, but in fact it did, and already got removed 
as abandoned.
+      if (executions.get(executeHolder.key).isDefined) {
+        if (getAbandonedTombstone(executeHolder.key).isDefined) {
+          throw new SparkSQLException(
+            errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
+            messageParameters = Map("handle" -> executeHolder.operationId))
+        } else {
+          throw new SparkSQLException(
+            errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
+            messageParameters = Map("handle" -> executeHolder.operationId))
+        }
+      }
+      sessionHolder.addExecuteHolder(executeHolder)
+      executions.put(executeHolder.key, executeHolder)
+      lastExecutionTime = None
+      logInfo(s"ExecuteHolder ${executeHolder.key} is created.")
+    }
+
+    schedulePeriodicChecks() // Starts the maintenance thread if it hasn't 
started.
+
+    executeHolder
+  }
+
+  /**
+   * Remove an ExecuteHolder from this global manager and from its session. 
Interrupt the
+   * execution if still running, free all resources.
+   */
+  private[connect] def removeExecuteHolder(key: ExecuteKey): Unit = {
+    var executeHolder: Option[ExecuteHolder] = None
+    executionsLock.synchronized {
+      executeHolder = executions.remove(key)
+      executeHolder.foreach(e => 
e.sessionHolder.removeExecuteHolder(e.operationId))
+      if (executions.isEmpty) {
+        lastExecutionTime = Some(System.currentTimeMillis())
+      }
+      logInfo(s"ExecuteHolder $key is removed.")
+    }
+    // close the execution outside the lock
+    executeHolder.foreach(_.close())
+  }
+
+  private[connect] def getExecuteHolder(key: ExecuteKey): 
Option[ExecuteHolder] = {
+    executionsLock.synchronized {
+      executions.get(key)
+    }
+  }
+
+  /** Get info about abandoned execution, if there is one. */
+  private[connect] def getAbandonedTombstone(key: ExecuteKey): 
Option[ExecuteInfo] = {
+    Option(abandonedTombstones.getIfPresent(key))
+  }
+
+  /**
+   * If there are no executions, return Left with System.currentTimeMillis of 
last active
+   * execution. Otherwise return Right with list of ExecuteInfo of all 
executions.
+   */
+  def listActiveExecutions: Either[Long, Seq[ExecuteInfo]] = 
executionsLock.synchronized {
+    if (executions.isEmpty) {
+      Left(lastExecutionTime.get)
+    } else {
+      Right(executions.values.map(_.getExecuteInfo).toBuffer.toSeq)
+    }
+  }
+
+  /**
+   * Return list of executions that got abandoned and removed by periodic 
maintenance. This is a
+   * cache, and the tombstones will be eventually removed.
+   */
+  def listAbandonedExecutions: Seq[ExecuteInfo] = {
+    abandonedTombstones.asMap.asScala.values.toBuffer.toSeq
+  }
+
+  private[service] def shutdown(): Unit = executionsLock.synchronized {
+    scheduledExecutor.foreach { executor =>
+      executor.shutdown()
+      executor.awaitTermination(1, TimeUnit.MINUTES)
+    }
+    scheduledExecutor = None
+  }
+
+  /**
+   * Schedules periodic maintenance checks if it is not already scheduled. The 
checks are looking
+   * for executions that have not been closed, but are left with no RPC 
attached to them, and
+   * removes them after a timeout.
+   */
+  private def schedulePeriodicChecks(): Unit = executionsLock.synchronized {
+    val interval = 
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong
+    val timeout = 
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong
+
+    scheduledExecutor match {
+      case Some(_) => // Already running.
+      case None =>
+        logInfo(s"Starting thread for cleanup of abandoned executions every 
$interval ms")
+        scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
+        scheduledExecutor.get.scheduleAtFixedRate(
+          () => {
+            try periodicMaintenance(timeout)
+            catch {
+              case NonFatal(ex) => logWarning("Unexpected exception in 
periodic task", ex)
+            }
+          },
+          interval,
+          interval,
+          TimeUnit.MILLISECONDS)
+    }
+  }
+
+  // Visible for testing.
+  private[connect] def periodicMaintenance(timeout: Long): Unit = {
+    logInfo("Started periodic run of SparkConnectExecutionManager 
maintenance.")
+
+    // Find any detached executions that expired and should be removed.
+    val toRemove = new mutable.ArrayBuffer[ExecuteHolder]()
+    executionsLock.synchronized {
+      val nowMs = System.currentTimeMillis()
+
+      executions.values.foreach { executeHolder =>
+        executeHolder.lastAttachedRpcTime match {
+          case Some(detached) =>
+            if (detached + timeout < nowMs) {
+              toRemove += executeHolder
+            }
+          case _ => // execution is active
+        }
+      }
+    }
+    if (!toRemove.isEmpty) {
+      // .. and remove them.
+      toRemove.foreach { executeHolder =>
+        val info = executeHolder.getExecuteInfo
+        logInfo(s"Found execution $info that was abandoned and expired and 
will be removed.")
+        removeExecuteHolder(executeHolder.key)
+        abandonedTombstones.put(executeHolder.key, info)
+      }
+    }
+    logInfo("Finished periodic run of SparkConnectExecutionManager 
maintenance.")
+  }
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
index b70c82ab137..393b832de87 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala
@@ -29,14 +29,25 @@ class SparkConnectReattachExecuteHandler(
     extends Logging {
 
   def handle(v: proto.ReattachExecuteRequest): Unit = {
-    val sessionHolder = SparkConnectService
-      .getIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
-    val executeHolder = 
sessionHolder.executeHolder(v.getOperationId).getOrElse {
-      logDebug(s"Reattach operation not found: ${v.getOperationId}")
-      throw new SparkSQLException(
-        errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND",
-        messageParameters = Map("handle" -> v.getOperationId))
-    }
+    val executeHolder = SparkConnectService.executionManager
+      .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
+      .getOrElse {
+        if (SparkConnectService.executionManager
+            .getAbandonedTombstone(
+              ExecuteKey(v.getUserContext.getUserId, v.getSessionId, 
v.getOperationId))
+            .isDefined) {
+          logDebug(s"Reattach operation abandoned: ${v.getOperationId}")
+          throw new SparkSQLException(
+            errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
+            messageParameters = Map("handle" -> v.getOperationId))
+
+        } else {
+          logDebug(s"Reattach operation not found: ${v.getOperationId}")
+          throw new SparkSQLException(
+            errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND",
+            messageParameters = Map("handle" -> v.getOperationId))
+        }
+      }
     if (!executeHolder.reattachable) {
       logWarning(s"Reattach to not reattachable operation.")
       throw new SparkSQLException(
@@ -48,10 +59,10 @@ class SparkConnectReattachExecuteHandler(
       new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, 
responseObserver)
     if (v.hasLastResponseId) {
       // start from response after lastResponseId
-      executeHolder.attachAndRunGrpcResponseSender(responseSender, 
v.getLastResponseId)
+      executeHolder.runGrpcResponseSender(responseSender, v.getLastResponseId)
     } else {
       // start from the start of the stream.
-      executeHolder.attachAndRunGrpcResponseSender(responseSender)
+      executeHolder.runGrpcResponseSender(responseSender)
     }
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
index 244aafb81ab..a3a7815609e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala
@@ -30,33 +30,36 @@ class SparkConnectReleaseExecuteHandler(
   def handle(v: proto.ReleaseExecuteRequest): Unit = {
     val sessionHolder = SparkConnectService
       .getIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
-    val executeHolder = 
sessionHolder.executeHolder(v.getOperationId).getOrElse {
-      throw new SparkSQLException(
-        errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND",
-        messageParameters = Map("handle" -> v.getOperationId))
-    }
-    if (!executeHolder.reattachable) {
-      throw new SparkSQLException(
-        errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
-        messageParameters = Map.empty)
-    }
-
-    if (v.hasReleaseAll) {
-      executeHolder.close()
-    } else if (v.hasReleaseUntil) {
-      val responseId = v.getReleaseUntil.getResponseId
-      executeHolder.releaseUntilResponseId(responseId)
-    } else {
-      throw new UnsupportedOperationException(s"Unknown ReleaseExecute type!")
-    }
-
-    val response = proto.ReleaseExecuteResponse
+
+    val responseBuilder = proto.ReleaseExecuteResponse
       .newBuilder()
       .setSessionId(v.getSessionId)
-      .setOperationId(v.getOperationId)
-      .build()
 
-    responseObserver.onNext(response)
+    // ExecuteHolder may be concurrently released by 
SparkConnectExecutionManager if the
+    // ReleaseExecute arrived after it was abandoned and timed out.
+    // An asynchronous ReleastUntil operation may also arrive after ReleaseAll.
+    // Because of that, make it noop and not fail if the ExecuteHolder is no 
longer there.
+    val executeHolderOption =
+      sessionHolder.executeHolder(v.getOperationId).foreach { executeHolder =>
+        if (!executeHolder.reattachable) {
+          throw new SparkSQLException(
+            errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
+            messageParameters = Map.empty)
+        }
+
+        responseBuilder.setOperationId(executeHolder.operationId)
+
+        if (v.hasReleaseAll) {
+          
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
+        } else if (v.hasReleaseUntil) {
+          val responseId = v.getReleaseUntil.getResponseId
+          executeHolder.releaseUntilResponseId(responseId)
+        } else {
+          throw new UnsupportedOperationException(s"Unknown ReleaseExecute 
type!")
+        }
+      }
+
+    responseObserver.onNext(responseBuilder.build())
     responseObserver.onCompleted()
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 6e607037e6c..3b8e1b064f0 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -277,6 +277,8 @@ object SparkConnectService extends Logging {
   private val userSessionMapping =
     cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, 
SessionHolder]()
 
+  private[connect] val executionManager = new SparkConnectExecutionManager()
+
   private[connect] val streamingSessionManager =
     new SparkConnectStreamingQueryCache()
 
@@ -352,6 +354,13 @@ object SparkConnectService extends Logging {
     userSessionMapping.invalidateAll()
   }
 
+  /**
+   * Used for testing.
+   */
+  private[connect] def putSessionForTesting(sessionHolder: SessionHolder): 
Unit = {
+    userSessionMapping.put((sessionHolder.userId, sessionHolder.sessionId), 
sessionHolder)
+  }
+
   private def newIsolatedSession(): SparkSession = {
     SparkSession.active.newSession()
   }
@@ -408,6 +417,8 @@ object SparkConnectService extends Logging {
         server.shutdownNow()
       }
     }
+    streamingSessionManager.shutdown()
+    executionManager.shutdown()
     userSessionMapping.invalidateAll()
     uiTab.foreach(_.detach())
   }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 04f3e15d38e..2050ebc01aa 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -133,10 +133,23 @@ private[connect] object ErrorUtils extends Logging {
     }
     partial
       .andThen { case (original, wrapped) =>
+        if (events.isDefined) {
+          // Errors thrown inside execution are user query errors, return then 
as INFO.
+          logInfo(
+            s"Spark Connect error " +
+              s"during: $opType. UserId: $userId. SessionId: $sessionId.",
+            original)
+        } else {
+          // Other errors are server RPC errors, return them as ERROR.
+          logError(
+            s"Spark Connect RPC error " +
+              s"during: $opType. UserId: $userId. SessionId: $sessionId.",
+            original)
+        }
+
         // If ExecuteEventsManager is present, this this is an execution error 
that needs to be
         // posted to it.
         events.foreach { executeEventsManager =>
-          logError(s"Error during: $opType. UserId: $userId. SessionId: 
$sessionId.", original)
           if (isInterrupted) {
             executeEventsManager.postCanceled()
           } else {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 40d83b07b75..39b4f40215d 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.connect.common.InvalidPlanInput
 import 
org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
-import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteStatus, 
SessionHolder, SessionStatus}
+import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteStatus, 
SessionHolder, SessionStatus, SparkConnectService}
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
@@ -131,9 +131,10 @@ trait SparkConnectPlanTest extends SharedSparkSession {
     val request = proto.ExecutePlanRequest
       .newBuilder()
       .setPlan(plan)
+      .setSessionId(sessionHolder.sessionId)
       .setUserContext(context)
       .build()
-    val executeHolder = sessionHolder.createExecuteHolder(request)
+    val executeHolder = 
SparkConnectService.executionManager.createExecuteHolder(request)
     executeHolder.eventsManager.status_(ExecuteStatus.Started)
     executeHolder
   }
diff --git a/docs/sql-error-conditions-invalid-handle-error-class.md 
b/docs/sql-error-conditions-invalid-handle-error-class.md
index f8d3eab4d1c..c4cbb48035f 100644
--- a/docs/sql-error-conditions-invalid-handle-error-class.md
+++ b/docs/sql-error-conditions-invalid-handle-error-class.md
@@ -29,6 +29,10 @@ This error class has the following derived error classes:
 
 Handle must be an UUID string of the format 
'00112233-4455-6677-8899-aabbccddeeff'
 
+## OPERATION_ABANDONED
+
+Operation was considered abandoned because of inactivity and removed.
+
 ## OPERATION_ALREADY_EXISTS
 
 Operation already exists.
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py 
b/python/pyspark/sql/connect/proto/base_pb2.py
index 70363922d8e..fa1868b489c 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
+    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -197,7 +197,7 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 11110
     _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 11157
     _RELEASEEXECUTERESPONSE._serialized_start = 11186
-    _RELEASEEXECUTERESPONSE._serialized_end = 11276
-    _SPARKCONNECTSERVICE._serialized_start = 11279
-    _SPARKCONNECTSERVICE._serialized_end = 12022
+    _RELEASEEXECUTERESPONSE._serialized_end = 11298
+    _SPARKCONNECTSERVICE._serialized_start = 11301
+    _SPARKCONNECTSERVICE._serialized_end = 12044
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi 
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index a886ecbd618..8fd5fa7a056 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -2684,18 +2684,35 @@ class 
ReleaseExecuteResponse(google.protobuf.message.Message):
     session_id: builtins.str
     """Session id in which the release was running."""
     operation_id: builtins.str
-    """Operation id of the operation which the release concerns."""
+    """Operation id of the operation on which the release executed.
+    If the operation couldn't be found (because e.g. it was concurrently 
released), will be unset.
+    Otherwise, it will be equal to the operation_id from request.
+    """
     def __init__(
         self,
         *,
         session_id: builtins.str = ...,
-        operation_id: builtins.str = ...,
+        operation_id: builtins.str | None = ...,
     ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_operation_id", b"_operation_id", "operation_id", b"operation_id"
+        ],
+    ) -> builtins.bool: ...
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
-            "operation_id", b"operation_id", "session_id", b"session_id"
+            "_operation_id",
+            b"_operation_id",
+            "operation_id",
+            b"operation_id",
+            "session_id",
+            b"session_id",
         ],
     ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_operation_id", 
b"_operation_id"]
+    ) -> typing_extensions.Literal["operation_id"] | None: ...
 
 global___ReleaseExecuteResponse = ReleaseExecuteResponse
diff --git a/python/pyspark/sql/connect/proto/base_pb2_grpc.py 
b/python/pyspark/sql/connect/proto/base_pb2_grpc.py
index 2702ec71ff5..e6bfda8a40a 100644
--- a/python/pyspark/sql/connect/proto/base_pb2_grpc.py
+++ b/python/pyspark/sql/connect/proto/base_pb2_grpc.py
@@ -130,7 +130,7 @@ class SparkConnectServiceServicer(object):
         """Release an reattachable execution, or parts thereof.
         The ExecutePlan must have been started with 
ReattachOptions.reattachable=true.
         Non reattachable executions are released automatically and immediately 
after the ExecutePlan
-        RPC and ReleaseExecute doesn't need to be used.
+        RPC and ReleaseExecute may not be used.
         """
         context.set_code(grpc.StatusCode.UNIMPLEMENTED)
         context.set_details("Method not implemented!")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to