changgyoopark-db commented on code in PR #48208:
URL: https://github.com/apache/spark/pull/48208#discussion_r1773214868


##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -139,23 +129,50 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
         }
       }
     } catch {
-      ErrorUtils.handleError(
-        "execute",
-        executeHolder.responseObserver,
-        executeHolder.sessionHolder.userId,
-        executeHolder.sessionHolder.sessionId,
-        Some(executeHolder.eventsManager),
-        interrupted)
+      case e: Throwable if state.getAcquire() != 
ThreadState.startedInterrupted =>
+        ErrorUtils.handleError(
+          "execute",
+          executeHolder.responseObserver,
+          executeHolder.sessionHolder.userId,
+          executeHolder.sessionHolder.sessionId,
+          Some(executeHolder.eventsManager),
+          false)(e)
+    } finally {
+      // Make sure to transition to completed in order to prevent the thread 
from being interrupted
+      // afterwards.
+      var currentState = state.getAcquire()
+      while (currentState == ThreadState.started ||
+        currentState == ThreadState.startedInterrupted) {
+        val interrupted = currentState == ThreadState.startedInterrupted
+        val prevState = state.compareAndExchangeRelease(currentState, 
ThreadState.completed)
+        if (prevState == currentState) {
+          if (interrupted) {
+            try {
+              ErrorUtils.handleError(
+                "execute",
+                executeHolder.responseObserver,
+                executeHolder.sessionHolder.userId,
+                executeHolder.sessionHolder.sessionId,
+                Some(executeHolder.eventsManager),
+                true)(new SparkSQLException("OPERATION_CANCELED", Map.empty))
+            } finally {
+              executeHolder.cleanup()
+            }
+          }
+          return
+        }
+        currentState = prevState
+      }
     }
   }
 
   // Inner executeInternal is wrapped by execute() for error handling.
-  private def executeInternal() = {
-    // synchronized - check if already got interrupted while starting.
-    lock.synchronized {
-      if (interrupted) {
-        throw new InterruptedException()
-      }
+  private def executeInternal(): Unit = {
+    val prevState = state.compareAndExchangeRelease(ThreadState.notStarted, 
ThreadState.started)
+    if (prevState != ThreadState.notStarted && prevState != 
ThreadState.started) {
+      // Silently return, expecting that the caller would handle the 
interruption.
+      assert(prevState != ThreadState.completed)

Review Comment:
   If we remove the state transition in `start`, then you're right. I'll make 
it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to