
dingbei updated SPARK-33418:
> TaskSchedulerImpl: Check pending tasks in advance when resource offers
> ----------------------------------------------------------------------
>                 Key: SPARK-33418
>                 URL: https://issues.apache.org/jira/browse/SPARK-33418
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.1
>            Reporter: dingbei
>            Priority: Major
>         Attachments: 1.png, 2.png, 3_1.png, 3_2.png, 3_3.png, 3_4.png, 6_2.png
> It begins with the needs to  start a lot of spark streaming receivers(custom 
> receivers) .  *The launch time gets super long when it comes to more than 300 
> receivers.* 
> *Tests preparation*
> environment: Yarn
> spark.executor.cores : 2
> spark.executor.instances : fist time 200, then 500
> *Tests and data*
> At first, we set the number of executors to 200 which means to start 200 
> receivers and everything goes well. It takes about 50s to launch all 
> receivers.({color:#ff0000}pic 1{color})
> Then we set the number of executors to 500 which means to start 500 
> receivers. The launch time became around 5 mins.({color:#ff0000}pic 2{color})
>  *Dig into souce code*
> I use Thread dump to check which methods takes relatively long 
> time.({color:#ff0000}pic 3{color}) Then I add logs between these methods. At 
> last I find that the loop in 
> {color:#00875a}TaskSchedulerImpl.resourceOffers{color} takes up most 
> percentage of the duration({color:#de350b}red color{color}).
> {color:#00875a}org.apache.spark.scheduler.TaskSchedulerImpl{color}
> {code:java}
> def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
> Seq[Seq[TaskDescription]] = synchronized {
>   // Mark each slave as alive and remember its hostname
>   // Also track if new executor is added
>   var newExecAvail = false
>   for (o <- offers) {
>     if (!hostToExecutors.contains(o.host)) {
>       hostToExecutors(o.host) = new HashSet[String]()
>     }
>     if (!executorIdToRunningTaskIds.contains(o.executorId)) {
>       hostToExecutors(o.host) += o.executorId
>       executorAdded(o.executorId, o.host)
>       executorIdToHost(o.executorId) = o.host
>       executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
>       newExecAvail = true
>     }
>   }
>   val hosts = offers.map(_.host).distinct
>   for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
>     hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
>   }
>   // Before making any offers, remove any nodes from the blacklist whose 
> blacklist has expired. Do
>   // this here to avoid a separate thread and added synchronization overhead, 
> and also because
>   // updating the blacklist is only relevant when task offers are being made.
>   blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
>   val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
>     offers.filter { offer =>
>       !blacklistTracker.isNodeBlacklisted(offer.host) &&
>         !blacklistTracker.isExecutorBlacklisted(offer.executorId)
>     }
>   }.getOrElse(offers)
>   val shuffledOffers = shuffleOffers(filteredOffers)
>   // Build a list of tasks to assign to each worker.
>   val tasks = shuffledOffers.map(o => new 
> ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
>   val availableResources = shuffledOffers.map(_.resources).toArray
>   val availableCpus = shuffledOffers.map(o => o.cores).toArray
>   val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
>   for (taskSet <- sortedTaskSets) {
>     logDebug("parentName: %s, name: %s, runningTasks: %s".format(
>       taskSet.parent.name, taskSet.name, taskSet.runningTasks))
>     if (newExecAvail) {
>       taskSet.executorAdded()
>     }
>   }
>   // Take each TaskSet in our scheduling order, and then offer it each node 
> in increasing order
>   // of locality levels so that it gets a chance to launch local tasks on all 
> of them.
>   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
>   for (taskSet <- sortedTaskSets) {
>     // we only need to calculate available slots if using barrier scheduling, 
> otherwise the
>     // value is -1
>     val availableSlots = if (taskSet.isBarrier) {
>       val availableResourcesAmount = availableResources.map { resourceMap =>
>         // note that the addresses here have been expanded according to the 
> numParts
>         resourceMap.map { case (name, addresses) => (name, addresses.length) }
>       }
>       calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
>     } else {
>       -1
>     }
>     // Skip the barrier taskSet if the available slots are less than the 
> number of pending tasks.
>     if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
>       // Skip the launch process.
>       // TODO SPARK-24819 If the job requires more slots than available (both 
> busy and free
>       // slots), fail the job on submit.
>       logInfo(s"Skip current round of resource offers for barrier stage 
> ${taskSet.stageId} " +
>         s"because the barrier taskSet requires ${taskSet.numTasks} slots, 
> while the total " +
>         s"number of available slots is $availableSlots.")
>     } else {
>       var launchedAnyTask = false
>       // Record all the executor IDs assigned barrier tasks on.
>       val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
>       for (currentMaxLocality <- taskSet.myLocalityLevels) {
>         var launchedTaskAtCurrentMaxLocality = false
>         do {
>           launchedTaskAtCurrentMaxLocality = 
> resourceOfferSingleTaskSet(taskSet,
>             currentMaxLocality, shuffledOffers, availableCpus,
>             availableResources, tasks, addressesWithDescs)
>           launchedAnyTask |= launchedTaskAtCurrentMaxLocality
>         } while (launchedTaskAtCurrentMaxLocality)
>       }
>       if (!launchedAnyTask) {
>         taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
> taskIndex =>
>             // If the taskSet is unschedulable we try to find an existing 
> idle blacklisted
>             // executor. If we cannot find one, we abort immediately. Else we 
> kill the idle
>             // executor and kick off an abortTimer which if it doesn't 
> schedule a task within the
>             // the timeout will abort the taskSet if we were unable to 
> schedule any task from the
>             // taskSet.
>             // Note 1: We keep track of schedulability on a per taskSet basis 
> rather than on a per
>             // task basis.
>             // Note 2: The taskSet can still be aborted when there are more 
> than one idle
>             // blacklisted executors and dynamic allocation is on. This can 
> happen when a killed
>             // idle executor isn't replaced in time by 
> ExecutorAllocationManager as it relies on
>             // pending tasks and doesn't kill executors on idle timeouts, 
> resulting in the abort
>             // timer to expire and abort the taskSet.
>             executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match 
> {
>               case Some ((executorId, _)) =>
>                 if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
>                   blacklistTrackerOpt.foreach(blt => 
> blt.killBlacklistedIdleExecutor(executorId))
>                   val timeout = 
> conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
>                   unschedulableTaskSetToExpiryTime(taskSet) = 
> clock.getTimeMillis() + timeout
>                   logInfo(s"Waiting for $timeout ms for completely "
>                     + s"blacklisted task to be schedulable again before 
> aborting $taskSet.")
>                   abortTimer.schedule(
>                     createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
> timeout)
>                 }
>               case None => // Abort Immediately
>                 logInfo("Cannot schedule any task because of complete 
> blacklisting. No idle" +
>                   s" executors can be found to kill. Aborting $taskSet." )
>                 taskSet.abortSinceCompletelyBlacklisted(taskIndex)
>             }
>         }
>       } else {
>         // We want to defer killing any taskSets as long as we have a non 
> blacklisted executor
>         // which can be used to schedule a task from any active taskSets. 
> This ensures that the
>         // job can make progress.
>         // Note: It is theoretically possible that a taskSet never gets 
> scheduled on a
>         // non-blacklisted executor and the abort timer doesn't kick in 
> because of a constant
>         // submission of new TaskSets. See the PR for more details.
>         if (unschedulableTaskSetToExpiryTime.nonEmpty) {
>           logInfo("Clearing the expiry times for all unschedulable taskSets 
> as a task was " +
>             "recently scheduled.")
>           unschedulableTaskSetToExpiryTime.clear()
>         }
>       }
>       if (launchedAnyTask && taskSet.isBarrier) {
>         // Check whether the barrier tasks are partially launched.
>         // TODO SPARK-24818 handle the assert failure case (that can happen 
> when some locality
>         // requirements are not fulfilled, and we should revert the launched 
> tasks).
>         if (addressesWithDescs.size != taskSet.numTasks) {
>           val errorMsg =
>             s"Fail resource offers for barrier stage ${taskSet.stageId} 
> because only " +
>               s"${addressesWithDescs.size} out of a total number of 
> ${taskSet.numTasks}" +
>               s" tasks got resource offers. This happens because barrier 
> execution currently " +
>               s"does not work gracefully with delay scheduling. We highly 
> recommend you to " +
>               s"disable delay scheduling by setting spark.locality.wait=0 as 
> a workaround if " +
>               s"you see this error frequently."
>           logWarning(errorMsg)
>           taskSet.abort(errorMsg)
>           throw new SparkException(errorMsg)
>         }
>         // materialize the barrier coordinator.
>         maybeInitBarrierCoordinator()
>         // Update the taskInfos into all the barrier task properties.
>         val addressesStr = addressesWithDescs
>           // Addresses ordered by partitionId
>           .sortBy(_._2.partitionId)
>           .map(_._1)
>           .mkString(",")
>         addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
> addressesStr))
>         logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
> tasks for barrier " +
>           s"stage ${taskSet.stageId}.")
>       }
>     }
>   }
>   // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
> barrier tasks don't get
>   // launched within a configured time.
>   if (tasks.nonEmpty) {
>     hasLaunchedTask = true
>   }
>   return tasks
> }
> {code}
> *Explaination and Solution*
> The loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color} will 
> iterate all none-zombie TaskSetManagers in a queue of Pool. Normally the size 
> of this queue is not so big because it gets removed when all of its tasks is 
> done. But for spark streaming jobs, we all konw receivers will be wrapped as 
> a non-stop job ,which means its TaskSetManager will exists  in the queue all 
> the time until the application is finished. For example, when it start to 
> launch the 10th receiver ,the size of the queue is 10 ,so it will iterates 10 
> times and when it starts to launch the 500th receiver, it will iterate 500 
> times . However 499 of the iteration are not necessay ,their task is already 
> on running .  
> When I digged deep into the code. I find that it decides whether a 
> TaskSetManagers still has pending tasks left in
>  {color:#00875a}TaskSetManagers .dequeueTaskFromList{color} which is far away 
> form the loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color}.
> {code:java}
> private def dequeueTaskFromList(
>     execId: String,
>     host: String,
>     list: ArrayBuffer[Int],
>     speculative: Boolean = false): Option[Int] = {
>   var indexOffset = list.size
>   while (indexOffset > 0) {
>     indexOffset -= 1
>     val index = list(indexOffset)
>     if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
>         !(speculative && hasAttemptOnHost(index, host))) {
>       // This should almost always be list.trimEnd(1) to remove tail
>       list.remove(indexOffset)
>       // Speculatable task should only be launched when at most one copy of 
> the
>       // original task is running
>       if (!successful(index)) {
>         if (copiesRunning(index) == 0) {
>           return Some(index)
>         } else if (speculative && copiesRunning(index) == 1) {
>           return Some(index)
>         }
>       }
>     }
>   }
>   None
> }{code}
> So I move the pending tasks code ahead to  the loop in 
> {color:#00875a}TaskSchedulerImpl.resourceOffers{color}. ,and I also consided 
> the speculation mode.This prevent TaskSetManagers of which all tasks are 
> finished from getting into this loop,which saves a lot of unnecessay 
> iterations.
> {code:java}
> def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
> Seq[Seq[TaskDescription]] = synchronized {
>   // Mark each slave as alive and remember its hostname
>   // Also track if new executor is added
>   var newExecAvail = false
>   for (o <- offers) {
>     if (!hostToExecutors.contains(o.host)) {
>       hostToExecutors(o.host) = new HashSet[String]()
>     }
>     if (!executorIdToRunningTaskIds.contains(o.executorId)) {
>       hostToExecutors(o.host) += o.executorId
>       executorAdded(o.executorId, o.host)
>       executorIdToHost(o.executorId) = o.host
>       executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
>       newExecAvail = true
>     }
>   }
>   val hosts = offers.map(_.host).distinct
>   for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
>     hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
>   }
>   // Before making any offers, remove any nodes from the blacklist whose 
> blacklist has expired. Do
>   // this here to avoid a separate thread and added synchronization overhead, 
> and also because
>   // updating the blacklist is only relevant when task offers are being made.
>   blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
>   val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
>     offers.filter { offer =>
>       !blacklistTracker.isNodeBlacklisted(offer.host) &&
>         !blacklistTracker.isExecutorBlacklisted(offer.executorId)
>     }
>   }.getOrElse(offers)
>   val shuffledOffers = shuffleOffers(filteredOffers)
>   // Build a list of tasks to assign to each worker.
>   val tasks = shuffledOffers.map(o => new 
> ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
>   val availableResources = shuffledOffers.map(_.resources).toArray
>   val availableCpus = shuffledOffers.map(o => o.cores).toArray
>   val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
>   for (taskSet <- sortedTaskSets) {
>     logDebug("parentName: %s, name: %s, runningTasks: %s".format(
>       taskSet.parent.name, taskSet.name, taskSet.runningTasks))
>     if (newExecAvail) {
>       taskSet.executorAdded()
>     }
>   }
>   // Take each TaskSet in our scheduling order, and then offer it each node 
> in increasing order
>   // of locality levels so that it gets a chance to launch local tasks on all 
> of them.
>   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
>   for (taskSet <- sortedTaskSets if hasPendingTasksToSched(taskSet)) {
>     // we only need to calculate available slots if using barrier scheduling, 
> otherwise the
>     // value is -1
>     val availableSlots = if (taskSet.isBarrier) {
>       val availableResourcesAmount = availableResources.map { resourceMap =>
>         // note that the addresses here have been expanded according to the 
> numParts
>         resourceMap.map { case (name, addresses) => (name, addresses.length) }
>       }
>       calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
>     } else {
>       -1
>     }
>     // Skip the barrier taskSet if the available slots are less than the 
> number of pending tasks.
>     if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
>       // Skip the launch process.
>       // TODO SPARK-24819 If the job requires more slots than available (both 
> busy and free
>       // slots), fail the job on submit.
>       logInfo(s"Skip current round of resource offers for barrier stage 
> ${taskSet.stageId} " +
>         s"because the barrier taskSet requires ${taskSet.numTasks} slots, 
> while the total " +
>         s"number of available slots is $availableSlots.")
>     } else {
>       var launchedAnyTask = false
>       // Record all the executor IDs assigned barrier tasks on.
>       val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
>       for (currentMaxLocality <- taskSet.myLocalityLevels) {
>         var launchedTaskAtCurrentMaxLocality = false
>         do {
>           launchedTaskAtCurrentMaxLocality = 
> resourceOfferSingleTaskSet(taskSet,
>             currentMaxLocality, shuffledOffers, availableCpus,
>             availableResources, tasks, addressesWithDescs)
>           launchedAnyTask |= launchedTaskAtCurrentMaxLocality
>         } while (launchedTaskAtCurrentMaxLocality)
>       }
>       if (!launchedAnyTask) {
>         taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
> taskIndex =>
>             // If the taskSet is unschedulable we try to find an existing 
> idle blacklisted
>             // executor. If we cannot find one, we abort immediately. Else we 
> kill the idle
>             // executor and kick off an abortTimer which if it doesn't 
> schedule a task within the
>             // the timeout will abort the taskSet if we were unable to 
> schedule any task from the
>             // taskSet.
>             // Note 1: We keep track of schedulability on a per taskSet basis 
> rather than on a per
>             // task basis.
>             // Note 2: The taskSet can still be aborted when there are more 
> than one idle
>             // blacklisted executors and dynamic allocation is on. This can 
> happen when a killed
>             // idle executor isn't replaced in time by 
> ExecutorAllocationManager as it relies on
>             // pending tasks and doesn't kill executors on idle timeouts, 
> resulting in the abort
>             // timer to expire and abort the taskSet.
>             executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match 
> {
>               case Some ((executorId, _)) =>
>                 if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
>                   blacklistTrackerOpt.foreach(blt => 
> blt.killBlacklistedIdleExecutor(executorId))
>                   val timeout = 
> conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
>                   unschedulableTaskSetToExpiryTime(taskSet) = 
> clock.getTimeMillis() + timeout
>                   logInfo(s"Waiting for $timeout ms for completely "
>                     + s"blacklisted task to be schedulable again before 
> aborting $taskSet.")
>                   abortTimer.schedule(
>                     createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
> timeout)
>                 }
>               case None => // Abort Immediately
>                 logInfo("Cannot schedule any task because of complete 
> blacklisting. No idle" +
>                   s" executors can be found to kill. Aborting $taskSet." )
>                 taskSet.abortSinceCompletelyBlacklisted(taskIndex)
>             }
>         }
>       } else {
>         // We want to defer killing any taskSets as long as we have a non 
> blacklisted executor
>         // which can be used to schedule a task from any active taskSets. 
> This ensures that the
>         // job can make progress.
>         // Note: It is theoretically possible that a taskSet never gets 
> scheduled on a
>         // non-blacklisted executor and the abort timer doesn't kick in 
> because of a constant
>         // submission of new TaskSets. See the PR for more details.
>         if (unschedulableTaskSetToExpiryTime.nonEmpty) {
>           logInfo("Clearing the expiry times for all unschedulable taskSets 
> as a task was " +
>             "recently scheduled.")
>           unschedulableTaskSetToExpiryTime.clear()
>         }
>       }
>       if (launchedAnyTask && taskSet.isBarrier) {
>         // Check whether the barrier tasks are partially launched.
>         // TODO SPARK-24818 handle the assert failure case (that can happen 
> when some locality
>         // requirements are not fulfilled, and we should revert the launched 
> tasks).
>         if (addressesWithDescs.size != taskSet.numTasks) {
>           val errorMsg =
>             s"Fail resource offers for barrier stage ${taskSet.stageId} 
> because only " +
>               s"${addressesWithDescs.size} out of a total number of 
> ${taskSet.numTasks}" +
>               s" tasks got resource offers. This happens because barrier 
> execution currently " +
>               s"does not work gracefully with delay scheduling. We highly 
> recommend you to " +
>               s"disable delay scheduling by setting spark.locality.wait=0 as 
> a workaround if " +
>               s"you see this error frequently."
>           logWarning(errorMsg)
>           taskSet.abort(errorMsg)
>           throw new SparkException(errorMsg)
>         }
>         // materialize the barrier coordinator.
>         maybeInitBarrierCoordinator()
>         // Update the taskInfos into all the barrier task properties.
>         val addressesStr = addressesWithDescs
>           // Addresses ordered by partitionId
>           .sortBy(_._2.partitionId)
>           .map(_._1)
>           .mkString(",")
>         addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
> addressesStr))
>         logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
> tasks for barrier " +
>           s"stage ${taskSet.stageId}.")
>       }
>     }
>   }
>   // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
> barrier tasks don't get
>   // launched within a configured time.
>   if (tasks.nonEmpty) {
>     hasLaunchedTask = true
>   }
>   return tasks
> }
> def hasPendingTasksToSched(taskSet: TaskSetManager): Boolean = {
>   taskSet.tasks.zipWithIndex
>     .exists(t =>
>       (!taskSet.successful(t._2) && taskSet.copiesRunning(t._2) == 0)
>         || taskSet.speculatableTasks.contains(t._2))
> }
> {code}
> *conclusion*
> We managed to reduce the launch time of all receivers to around 50s stablely 
> (500 receivers).I think the spark contributors haven't thought a scenario 
> where a lot of job are running at the same time which I know is unusual but 
> still  a good complement。

