This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c34ec411244 [SPARK-44818] Fix race for pending task kill issued before taskThread is initialized c34ec411244 is described below commit c34ec41124446164e9cfdd34101f25c6aa0ae235 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Mon Aug 21 13:25:28 2023 -0700 [SPARK-44818] Fix race for pending task kill issued before taskThread is initialized ### What changes were proposed in this pull request? Fix race for pending task kill issued before taskThread is initialized ### Why are the changes needed? We see that there is a race for tasks that are interrupted through stage cancellation and that may be added to the TaskSet, but don't yet have taskThread initialized. Basically, we try to kill ongoing task attempts to handle stage cancellation ``` logInfo("Cancelling stage " + stageId) // Kill all running tasks for the stage. killAllTaskAttempts(stageId, interruptThread, reason = "Stage cancelled: " + reason) // Cancel all attempts for the stage. ``` However, there is a chance that taskThread is not initialized yet and we only set the reasonIfKilled. ``` def kill(interruptThread: Boolean, reason: String): Unit = { require(reason != null) _reasonIfKilled = reason if (context != null) { context.markInterrupted(reason) } if (interruptThread && taskThread != null) { taskThread.interrupt(). <--- never hit } ``` Then within the task execution thread itself, we try to call kill again since the reasonIfKilled is set. However, this time we pass interruptThread as false explicitly since we don't know the status of the previous call. ``` taskThread = Thread.currentThread() if (_reasonIfKilled != null) { kill(interruptThread = false, _reasonIfKilled) <-- only context will be set, } ``` The TaskReaper has also finished its previous and only attempt at task interruption since we don't try for multiple times in this case. Eventually, the task is not interrupted even once and it gets blocked on some I/O or wait calls which might not finish within the reaper timeout, leading to the JVM being killed. ``` taskRunner.kill(interruptThread = interruptThread, reason = reason) ``` The change tries to fix this issue by checking for the presence of `reasonIfKilled` on the context and issuing a `TaskKilledException` before we execute `runTask` thereby preventing execution of the actual task and freeing up the slot and also preventing future issues with the reaper. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ``` [info] JobCancellationSuite: ... [info] Run completed in 35 seconds, 781 milliseconds. [info] Total number of tests run: 13 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 13, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #42504 from anishshri-db/task/SPARK-44818. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> --- core/src/main/scala/org/apache/spark/TaskContext.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 450c00928c9..0f8a10d734b 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -158,6 +158,11 @@ abstract class TaskContext extends Serializable { /** Runs a task with this context, ensuring failure and completion listeners get triggered. */ private[spark] def runTaskWithListeners[T](task: Task[T]): T = { try { + // SPARK-44818 - Its possible that taskThread has not been initialized when kill is initially + // called with interruptThread=true. We do set the reason and eventually will set it on the + // context too within run(). If that's the case, kill the thread before it starts executing + // the actual task. + killTaskIfInterrupted() task.runTask(this) } catch { case e: Throwable => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org