[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216788096 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) --- End diff -- noted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216788079 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() --- End diff -- noted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216788016 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -623,8 +623,9 @@ private[spark] class TaskSetManager( * * It is possible that this taskset has become impossible to schedule *anywhere* due to the * blacklist. The most common scenario would be if there are fewer executors than - * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job - * will hang. + * spark.task.maxFailures. We need to detect this so we can avoid the job from being hung. + * If dynamic allocation is enabled we try to acquire new executor/s by killing the existing one. + * In case of static allocation we abort the taskSet immediately to fail the job. --- End diff -- Yes. The change of removing a single executor takes care of static allocation as well. I will update the comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216724373 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) --- End diff -- I think this should probably be logInfo (unless there is something else similar at INFO level elsewhere) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216725731 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() --- End diff -- I'd include a logInfo here that spark can't schedule anything because of blacklisting, but its going to try to kill blacklisted executors and acquire new ones. Also mention how long it will wait before giving up and the associated conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216723175 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() --- End diff -- `hostToExecutors.head._2.head` just thinking "aloud" -- I guess taking an arbitrary executor here is OK, as we know there is some task that can't run on any executor. But I wonder if we could have some priority here -- eg. I'd much rather kill an executor which has been blacklisted for an entire stage or the whole app, rather than one that was blacklisted for just some task. Need to look into if there is an efficient way to keep that priority list, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216726755 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -623,8 +623,9 @@ private[spark] class TaskSetManager( * * It is possible that this taskset has become impossible to schedule *anywhere* due to the * blacklist. The most common scenario would be if there are fewer executors than - * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job - * will hang. + * spark.task.maxFailures. We need to detect this so we can avoid the job from being hung. + * If dynamic allocation is enabled we try to acquire new executor/s by killing the existing one. + * In case of static allocation we abort the taskSet immediately to fail the job. --- End diff -- why do you want something different with static allocation? If you kill an executor, static allocation will also request a replacement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216726323 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + abortTimer.schedule(new TimerTask() { +override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && +(unschedulableTaskSetToExpiryTime(taskSet) + + UNSCHEDULABLE_TASKSET_TIMEOUT_MS) + <= clock.getTimeMillis() + ) { +logInfo("Cannot schedule any task because of complete blacklisting. " + + "Wait time for scheduling expired. Aborting the application.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } else { +this.cancel() + } +} + }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) +} +case _ => // Do nothing. +} + } else { +// If a task was scheduled, we clear the expiry time for the taskSet. The abort timer +// checks this entry to decide if we want to abort the taskSet. +if (unschedulableTaskSetToExpiryTime.contains(taskSet)) { --- End diff -- you can move this up to the `else` so its an `else if`. Or you could also just call `remove` without checking `contains`, that avoids probing twice. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user dhruve commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r215036162 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { +// If the taskSet is unschedulable we kill the existing blacklisted executor/s and +// kick off an abortTimer which after waiting will abort the taskSet if we were +// unable to get new executors and couldn't schedule a task from the taskSet. +// Note: We keep a track of schedulability on a per taskSet basis rather than on a +// per task basis. +if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ +executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) --- End diff -- - To refresh executors, you need to enable `spark.blacklist.killBlacklistedExecutors`. - I was thinking about it, killing all the executors is a little too harsh. Killing only a single executor would help mitigate this, although this would also lead to failing the running tasks on that executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r214720097 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { +// If the taskSet is unschedulable we kill the existing blacklisted executor/s and +// kick off an abortTimer which after waiting will abort the taskSet if we were +// unable to get new executors and couldn't schedule a task from the taskSet. +// Note: We keep a track of schedulability on a per taskSet basis rather than on a +// per task basis. +if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ +executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + }) + ) + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + abortTimer.schedule(new TimerTask() { +override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && +(unschedulableTaskSetToExpiryTime(taskSet) + + UNSCHEDULABLE_TASKSET_TIMEOUT_MS) + <= clock.getTimeMillis() + ) { +logInfo("Cannot schedule any task because of complete blacklisting. " + + "Wait time for scheduling expired. Aborting the application.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } else { +this.cancel() + } +} + }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) +} + } else { +// TODO: try acquiring new executors for static allocation before aborting. --- End diff -- How ? Waiting for other tasks finish and release resources ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r214719743 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { +// If the taskSet is unschedulable we kill the existing blacklisted executor/s and +// kick off an abortTimer which after waiting will abort the taskSet if we were +// unable to get new executors and couldn't schedule a task from the taskSet. +// Note: We keep a track of schedulability on a per taskSet basis rather than on a +// per task basis. +if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ +executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) --- End diff -- Seriously? You killed all executors ? What if other taskSets' tasks are running on them ? BTW, if you want to refresh executors, you have to enable `spark.blacklist.killBlacklistedExecutors` also. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org