[ https://issues.apache.org/jira/browse/SPARK-23252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16344484#comment-16344484 ]
Bang Xiao edited comment on SPARK-23252 at 1/30/18 4:30 AM: ------------------------------------------------------------ After the executor and NodeManager is killed, failure tasks never relaunched because of reason is not yet known. {code:java} CoarseGrainedSchedulerBackend.scala: protected def disableExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { if (executorIsAlive(executorId)) { executorsPendingLossReason += executorId true } else { // Returns true for explicitly killed executors, we also need to get pending loss reasons; // For others return false. executorsPendingToRemove.contains(executorId) } } if (shouldDisable) { logInfo(s"Disabling executor $executorId.") scheduler.executorLost(executorId, LossReasonPending) } shouldDisable }{code} TaskSchedulerImpl will handle executorLost and removeExecutor {code:java} TaskSchedulerImpl.scala: private def removeExecutor(executorId: String, reason: ExecutorLossReason) { // The tasks on the lost executor may not send any more status updates (because the executor // has been lost), so they should be cleaned up here. executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => logDebug("Cleaning up TaskScheduler state for tasks " + s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") // We do not notify the TaskSetManager of the task failures because that will // happen below in the rootPool.executorLost() call. taskIds.foreach(cleanupTaskState) } val host = executorIdToHost(executorId) val execs = hostToExecutors.getOrElse(host, new HashSet) execs -= executorId if (execs.isEmpty) { hostToExecutors -= host for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) { hosts -= host if (hosts.isEmpty) { hostsByRack -= rack } } } if (reason != LossReasonPending) { executorIdToHost -= executorId rootPool.executorLost(executorId, host, reason) } blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId)) } {code} but if the reason is LossReasonPending, it will not trigger lost tasks relaunched. This is consistent with what I've observed from the log. was (Author: chopinxb): After the executor and NodeManager is killed, failure tasks never relaunched because of reason is not yet known. {code:java} CoarseGrainedSchedulerBackend.scala: protected def disableExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { if (executorIsAlive(executorId)) { executorsPendingLossReason += executorId true } else { // Returns true for explicitly killed executors, we also need to get pending loss reasons; // For others return false. executorsPendingToRemove.contains(executorId) } } if (shouldDisable) { logInfo(s"Disabling executor $executorId.") scheduler.executorLost(executorId, LossReasonPending) } shouldDisable }{code} TaskSchedulerImpl will handle executorLost and removeExecutor {code:java} TaskSchedulerImpl.scala: private def removeExecutor(executorId: String, reason: ExecutorLossReason) { // The tasks on the lost executor may not send any more status updates (because the executor // has been lost), so they should be cleaned up here. executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => logDebug("Cleaning up TaskScheduler state for tasks " + s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") // We do not notify the TaskSetManager of the task failures because that will // happen below in the rootPool.executorLost() call. taskIds.foreach(cleanupTaskState) } val host = executorIdToHost(executorId) val execs = hostToExecutors.getOrElse(host, new HashSet) execs -= executorId if (execs.isEmpty) { hostToExecutors -= host for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) { hosts -= host if (hosts.isEmpty) { hostsByRack -= rack } } } if (reason != LossReasonPending) { executorIdToHost -= executorId rootPool.executorLost(executorId, host, reason) } blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId)) } {code} but if the reason is LossReasonPending, it will not trigger lost tasks relaunched. This is consistent with what I've observed from the log. > When NodeManager and CoarseGrainedExecutorBackend processes are killed, the > job will be blocked > ----------------------------------------------------------------------------------------------- > > Key: SPARK-23252 > URL: https://issues.apache.org/jira/browse/SPARK-23252 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.0 > Reporter: Bang Xiao > Priority: Major > > This happens when 'spark.dynamicAllocation.enabled' is set to be 'true'. We > use Yarn as our resource manager. > 1,spark-submit "JavaWordCount" application in yarn-client mode > 2, Kill NodeManager and CoarseGrainedExecutorBackend processes in one node > when the job is in stage 0 > if we just kill all CoarseGrainedExecutorBackend in that node, TaskSetManager > will pending the failure task to resubmit. but if the NodeManager and > CoarseGrainedExecutorBackend processes killed simultaneously,the whole job > will be blocked. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org