Repository: spark Updated Branches: refs/heads/branch-2.2 0b51fd3eb -> be68f86e1
[SPARK-22535][PYSPARK] Sleep before killing the python worker in PythRunner.MonitorThread (branch-2.2) ## What changes were proposed in this pull request? Backport #19762 to 2.2 ## How was this patch tested? Jenkins Author: Shixiong Zhu <zsxw...@gmail.com> Closes #19768 from zsxwing/SPARK-22535-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be68f86e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be68f86e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be68f86e Branch: refs/heads/branch-2.2 Commit: be68f86e11d64209d9e325ce807025318f383bea Parents: 0b51fd3 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Thu Nov 16 14:41:05 2017 -0800 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Thu Nov 16 14:41:05 2017 -0800 ---------------------------------------------------------------------- .../org/apache/spark/api/python/PythonRDD.scala | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/be68f86e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 807b51f..63ae705 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -360,6 +360,9 @@ private[spark] class PythonRunner( class MonitorThread(env: SparkEnv, worker: Socket, context: TaskContext) extends Thread(s"Worker Monitor for $pythonExec") { + /** How long to wait before killing the python worker if a task cannot be interrupted. */ + private val taskKillTimeout = env.conf.getTimeAsMs("spark.python.task.killTimeout", "2s") + setDaemon(true) override def run() { @@ -369,12 +372,18 @@ private[spark] class PythonRunner( Thread.sleep(2000) } if (!context.isCompleted) { - try { - logWarning("Incomplete task interrupted: Attempting to kill Python Worker") - env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker) - } catch { - case e: Exception => - logError("Exception when trying to kill worker", e) + Thread.sleep(taskKillTimeout) + if (!context.isCompleted) { + try { + // Mimic the task name used in `Executor` to help the user find out the task to blame. + val taskName = s"${context.partitionId}.${context.taskAttemptId} " + + s"in stage ${context.stageId} (TID ${context.taskAttemptId})" + logWarning(s"Incomplete task $taskName interrupted: Attempting to kill Python Worker") + env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker) + } catch { + case e: Exception => + logError("Exception when trying to kill worker", e) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org