changgyoopark-db commented on code in PR #48208: URL: https://github.com/apache/spark/pull/48208#discussion_r1773214868
########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala: ########## @@ -139,23 +129,50 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends } } } catch { - ErrorUtils.handleError( - "execute", - executeHolder.responseObserver, - executeHolder.sessionHolder.userId, - executeHolder.sessionHolder.sessionId, - Some(executeHolder.eventsManager), - interrupted) + case e: Throwable if state.getAcquire() != ThreadState.startedInterrupted => + ErrorUtils.handleError( + "execute", + executeHolder.responseObserver, + executeHolder.sessionHolder.userId, + executeHolder.sessionHolder.sessionId, + Some(executeHolder.eventsManager), + false)(e) + } finally { + // Make sure to transition to completed in order to prevent the thread from being interrupted + // afterwards. + var currentState = state.getAcquire() + while (currentState == ThreadState.started || + currentState == ThreadState.startedInterrupted) { + val interrupted = currentState == ThreadState.startedInterrupted + val prevState = state.compareAndExchangeRelease(currentState, ThreadState.completed) + if (prevState == currentState) { + if (interrupted) { + try { + ErrorUtils.handleError( + "execute", + executeHolder.responseObserver, + executeHolder.sessionHolder.userId, + executeHolder.sessionHolder.sessionId, + Some(executeHolder.eventsManager), + true)(new SparkSQLException("OPERATION_CANCELED", Map.empty)) + } finally { + executeHolder.cleanup() + } + } + return + } + currentState = prevState + } } } // Inner executeInternal is wrapped by execute() for error handling. - private def executeInternal() = { - // synchronized - check if already got interrupted while starting. - lock.synchronized { - if (interrupted) { - throw new InterruptedException() - } + private def executeInternal(): Unit = { + val prevState = state.compareAndExchangeRelease(ThreadState.notStarted, ThreadState.started) + if (prevState != ThreadState.notStarted && prevState != ThreadState.started) { + // Silently return, expecting that the caller would handle the interruption. + assert(prevState != ThreadState.completed) Review Comment: If we remove the state transition in `start`, then you're right. I'll make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org