[ 
https://issues.apache.org/jira/browse/SPARK-23252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344484#comment-16344484
 ] 

Bang Xiao edited comment on SPARK-23252 at 1/30/18 4:30 AM:
------------------------------------------------------------

After the executor and NodeManager is killed, failure tasks never relaunched 
because of reason is not yet known.
{code:java}
CoarseGrainedSchedulerBackend.scala:

protected def disableExecutor(executorId: String): Boolean = {
    val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
      if (executorIsAlive(executorId)) {
        executorsPendingLossReason += executorId
        true
      } else {
        // Returns true for explicitly killed executors, we also need to get 
pending loss reasons;
        // For others return false.
        executorsPendingToRemove.contains(executorId)
      }
    }
    if (shouldDisable) {
      logInfo(s"Disabling executor $executorId.")
      scheduler.executorLost(executorId, LossReasonPending)
    }
    shouldDisable
  }{code}
TaskSchedulerImpl will handle executorLost and removeExecutor
{code:java}
TaskSchedulerImpl.scala:
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
  // The tasks on the lost executor may not send any more status updates 
(because the executor
  // has been lost), so they should be cleaned up here.
  executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
    logDebug("Cleaning up TaskScheduler state for tasks " +
      s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
    // We do not notify the TaskSetManager of the task failures because that 
will
    // happen below in the rootPool.executorLost() call.
    taskIds.foreach(cleanupTaskState)
  }

  val host = executorIdToHost(executorId)
  val execs = hostToExecutors.getOrElse(host, new HashSet)
  execs -= executorId
  if (execs.isEmpty) {
    hostToExecutors -= host
    for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
      hosts -= host
      if (hosts.isEmpty) {
        hostsByRack -= rack
      }
    }
  }

  if (reason != LossReasonPending) {
    executorIdToHost -= executorId
    rootPool.executorLost(executorId, host, reason)
  }
  blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
{code}
but if the reason is LossReasonPending, it will not trigger lost tasks 
relaunched.

This is consistent with what I've observed from the log.

 


was (Author: chopinxb):
After the executor and NodeManager is killed, failure tasks never relaunched 
because of reason is not yet known.
{code:java}
CoarseGrainedSchedulerBackend.scala:

protected def disableExecutor(executorId: String): Boolean = {
    val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
      if (executorIsAlive(executorId)) {
        executorsPendingLossReason += executorId
        true
      } else {
        // Returns true for explicitly killed executors, we also need to get 
pending loss reasons;
        // For others return false.
        executorsPendingToRemove.contains(executorId)
      }
    }
    if (shouldDisable) {
      logInfo(s"Disabling executor $executorId.")
      scheduler.executorLost(executorId, LossReasonPending)
    }
    shouldDisable
  }{code}
TaskSchedulerImpl will handle executorLost and removeExecutor
{code:java}
TaskSchedulerImpl.scala:
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
  // The tasks on the lost executor may not send any more status updates 
(because the executor
  // has been lost), so they should be cleaned up here.
  executorIdToRunningTaskIds.remove(executorId).foreach { taskIds =>
    logDebug("Cleaning up TaskScheduler state for tasks " +
      s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId")
    // We do not notify the TaskSetManager of the task failures because that 
will
    // happen below in the rootPool.executorLost() call.
    taskIds.foreach(cleanupTaskState)
  }

  val host = executorIdToHost(executorId)
  val execs = hostToExecutors.getOrElse(host, new HashSet)
  execs -= executorId
  if (execs.isEmpty) {
    hostToExecutors -= host
    for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
      hosts -= host
      if (hosts.isEmpty) {
        hostsByRack -= rack
      }
    }
  }

  if (reason != LossReasonPending) {
    executorIdToHost -= executorId
    rootPool.executorLost(executorId, host, reason)
  }
  blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
}
{code}
but if the reason is LossReasonPending, it will not trigger lost tasks 
relaunched.

This is consistent with what I've observed from the log.

 

> When NodeManager and CoarseGrainedExecutorBackend processes are killed, the 
> job will be blocked
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23252
>                 URL: https://issues.apache.org/jira/browse/SPARK-23252
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Bang Xiao
>            Priority: Major
>
> This happens when 'spark.dynamicAllocation.enabled' is set to be 'true'. We 
> use Yarn as our resource manager. 
> 1,spark-submit "JavaWordCount" application in yarn-client mode
> 2,   Kill NodeManager and CoarseGrainedExecutorBackend processes in one node 
> when the job is in stage 0 
> if we just kill all CoarseGrainedExecutorBackend in that node, TaskSetManager 
> will pending the failure task to resubmit. but if the NodeManager and 
> CoarseGrainedExecutorBackend processes killed simultaneously,the whole job 
> will be blocked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to