[ 
https://issues.apache.org/jira/browse/SPARK-33418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dingbei updated SPARK-33418:
----------------------------
    Description: 
It begins with the needs to  start a lot of spark streaming receivers(custom 
receivers) .  *The launch time gets super long when it comes to more than 300 
receivers.* 

*Tests preparation*

environment: Yarn

spark.executor.cores : 2

spark.executor.instances : fist time 200, then 500

*Tests and data*

At first, we set the number of executors to 200 which means to start 200 
receivers and everything goes well. It takes about 50s to launch all 
receivers.({color:#ff0000}pic 1{color})

Then we set the number of executors to 500 which means to start 500 receivers. 
The launch time became around 5 mins.({color:#ff0000}pic 2{color})

 *Dig into souce code*

I use Thread dump to check which methods takes relatively long 
time.({color:#ff0000}pic 3{color}) Then I add logs between these methods. At 
last I find that the loop in 
{color:#00875a}TaskSchedulerImpl.resourceOffers{color} takes up most percentage 
of the duration({color:#de350b}red color{color}).

{color:#00875a}org.apache.spark.scheduler.TaskSchedulerImpl{color}

 
{code:java}
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] 
= synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  var newExecAvail = false
  for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host) += o.executorId
      executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
      newExecAvail = true
    }
  }
  val hosts = offers.map(_.host).distinct
  for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
  }

  // Before making any offers, remove any nodes from the blacklist whose 
blacklist has expired. Do
  // this here to avoid a separate thread and added synchronization overhead, 
and also because
  // updating the blacklist is only relevant when task offers are being made.
  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
    offers.filter { offer =>
      !blacklistTracker.isNodeBlacklisted(offer.host) &&
        !blacklistTracker.isExecutorBlacklisted(offer.executorId)
    }
  }.getOrElse(offers)

  val shuffledOffers = shuffleOffers(filteredOffers)
  // Build a list of tasks to assign to each worker.
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores 
/ CPUS_PER_TASK))
  val availableResources = shuffledOffers.map(_.resources).toArray
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // Take each TaskSet in our scheduling order, and then offer it each node in 
increasing order
  // of locality levels so that it gets a chance to launch local tasks on all 
of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
  for (taskSet <- sortedTaskSets) {
    // we only need to calculate available slots if using barrier scheduling, 
otherwise the
    // value is -1
    val availableSlots = if (taskSet.isBarrier) {
      val availableResourcesAmount = availableResources.map { resourceMap =>
        // note that the addresses here have been expanded according to the 
numParts
        resourceMap.map { case (name, addresses) => (name, addresses.length) }
      }
      calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
    } else {
      -1
    }
    // Skip the barrier taskSet if the available slots are less than the number 
of pending tasks.
    if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
      // Skip the launch process.
      // TODO SPARK-24819 If the job requires more slots than available (both 
busy and free
      // slots), fail the job on submit.
      logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while 
the total " +
        s"number of available slots is $availableSlots.")
    } else {
      var launchedAnyTask = false
      // Record all the executor IDs assigned barrier tasks on.
      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var launchedTaskAtCurrentMaxLocality = false
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
            currentMaxLocality, shuffledOffers, availableCpus,
            availableResources, tasks, addressesWithDescs)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }

      if (!launchedAnyTask) {
        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
taskIndex =>
            // If the taskSet is unschedulable we try to find an existing idle 
blacklisted
            // executor. If we cannot find one, we abort immediately. Else we 
kill the idle
            // executor and kick off an abortTimer which if it doesn't schedule 
a task within the
            // the timeout will abort the taskSet if we were unable to schedule 
any task from the
            // taskSet.
            // Note 1: We keep track of schedulability on a per taskSet basis 
rather than on a per
            // task basis.
            // Note 2: The taskSet can still be aborted when there are more 
than one idle
            // blacklisted executors and dynamic allocation is on. This can 
happen when a killed
            // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
            // pending tasks and doesn't kill executors on idle timeouts, 
resulting in the abort
            // timer to expire and abort the taskSet.
            executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
              case Some ((executorId, _)) =>
                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
                  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 
* 1000
                  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
                  logInfo(s"Waiting for $timeout ms for completely "
                    + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
                  abortTimer.schedule(
                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
timeout)
                }
              case None => // Abort Immediately
                logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
                  s" executors can be found to kill. Aborting $taskSet." )
                taskSet.abortSinceCompletelyBlacklisted(taskIndex)
            }
        }
      } else {
        // We want to defer killing any taskSets as long as we have a non 
blacklisted executor
        // which can be used to schedule a task from any active taskSets. This 
ensures that the
        // job can make progress.
        // Note: It is theoretically possible that a taskSet never gets 
scheduled on a
        // non-blacklisted executor and the abort timer doesn't kick in because 
of a constant
        // submission of new TaskSets. See the PR for more details.
        if (unschedulableTaskSetToExpiryTime.nonEmpty) {
          logInfo("Clearing the expiry times for all unschedulable taskSets as 
a task was " +
            "recently scheduled.")
          unschedulableTaskSetToExpiryTime.clear()
        }
      }

      if (launchedAnyTask && taskSet.isBarrier) {
        // Check whether the barrier tasks are partially launched.
        // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
        // requirements are not fulfilled, and we should revert the launched 
tasks).
        if (addressesWithDescs.size != taskSet.numTasks) {
          val errorMsg =
            s"Fail resource offers for barrier stage ${taskSet.stageId} because 
only " +
              s"${addressesWithDescs.size} out of a total number of 
${taskSet.numTasks}" +
              s" tasks got resource offers. This happens because barrier 
execution currently " +
              s"does not work gracefully with delay scheduling. We highly 
recommend you to " +
              s"disable delay scheduling by setting spark.locality.wait=0 as a 
workaround if " +
              s"you see this error frequently."
          logWarning(errorMsg)
          taskSet.abort(errorMsg)
          throw new SparkException(errorMsg)
        }

        // materialize the barrier coordinator.
        maybeInitBarrierCoordinator()

        // Update the taskInfos into all the barrier task properties.
        val addressesStr = addressesWithDescs
          // Addresses ordered by partitionId
          .sortBy(_._2.partitionId)
          .map(_._1)
          .mkString(",")
        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
tasks for barrier " +
          s"stage ${taskSet.stageId}.")
      }
    }
  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
  // launched within a configured time.
  if (tasks.nonEmpty) {
    hasLaunchedTask = true
  }
  return tasks
}
{code}
 

*Explaination and Solution*

The loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color} will iterate 
all none-zombie TaskSetManagers in a queue of Pool. Normally the size of this 
queue is not so big because it gets removed when all of its tasks is done. But 
for spark streaming jobs, we all konw receivers will be wrapped as a non-stop 
job ,which means its TaskSetManager will exists  in the queue all the time 
until the application is finished. For example, when it start to launch the 
10th receiver ,the size of the queue is 10 ,so it will iterates 10 times and 
when it starts to launch the 500th receiver, it will iterate 500 times . 
However 499 of the iteration are not necessay ,their task is already on running 
.  

When I digged deep into the code. I find that it decides whether a 
TaskSetManagers still has pending tasks left in

 {color:#00875a}TaskSetManagers .dequeueTaskFromList{color} which is far away 
form the loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color}.
{code:java}
private def dequeueTaskFromList(
    execId: String,
    host: String,
    list: ArrayBuffer[Int],
    speculative: Boolean = false): Option[Int] = {
  var indexOffset = list.size
  while (indexOffset > 0) {
    indexOffset -= 1
    val index = list(indexOffset)
    if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
        !(speculative && hasAttemptOnHost(index, host))) {
      // This should almost always be list.trimEnd(1) to remove tail
      list.remove(indexOffset)
      // Speculatable task should only be launched when at most one copy of the
      // original task is running
      if (!successful(index)) {
        if (copiesRunning(index) == 0) {
          return Some(index)
        } else if (speculative && copiesRunning(index) == 1) {
          return Some(index)
        }
      }
    }
  }
  None
}{code}
So I move the pending tasks code ahead to  the loop in 
{color:#00875a}TaskSchedulerImpl.resourceOffers{color}. ,and I also consided 
the speculation mode.This prevent TaskSetManagers of which all tasks are 
finished from getting into this loop,which saves a lot of unnecessay iterations.
{code:java}
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] 
= synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  var newExecAvail = false
  for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host) += o.executorId
      executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
      newExecAvail = true
    }
  }
  val hosts = offers.map(_.host).distinct
  for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
  }

  // Before making any offers, remove any nodes from the blacklist whose 
blacklist has expired. Do
  // this here to avoid a separate thread and added synchronization overhead, 
and also because
  // updating the blacklist is only relevant when task offers are being made.
  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
    offers.filter { offer =>
      !blacklistTracker.isNodeBlacklisted(offer.host) &&
        !blacklistTracker.isExecutorBlacklisted(offer.executorId)
    }
  }.getOrElse(offers)

  val shuffledOffers = shuffleOffers(filteredOffers)
  // Build a list of tasks to assign to each worker.
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores 
/ CPUS_PER_TASK))
  val availableResources = shuffledOffers.map(_.resources).toArray
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // Take each TaskSet in our scheduling order, and then offer it each node in 
increasing order
  // of locality levels so that it gets a chance to launch local tasks on all 
of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
  for (taskSet <- sortedTaskSets if hasPendingTasksToSched(taskSet)) {
    // we only need to calculate available slots if using barrier scheduling, 
otherwise the
    // value is -1
    val availableSlots = if (taskSet.isBarrier) {
      val availableResourcesAmount = availableResources.map { resourceMap =>
        // note that the addresses here have been expanded according to the 
numParts
        resourceMap.map { case (name, addresses) => (name, addresses.length) }
      }
      calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
    } else {
      -1
    }
    // Skip the barrier taskSet if the available slots are less than the number 
of pending tasks.
    if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
      // Skip the launch process.
      // TODO SPARK-24819 If the job requires more slots than available (both 
busy and free
      // slots), fail the job on submit.
      logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while 
the total " +
        s"number of available slots is $availableSlots.")
    } else {
      var launchedAnyTask = false
      // Record all the executor IDs assigned barrier tasks on.
      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var launchedTaskAtCurrentMaxLocality = false
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
            currentMaxLocality, shuffledOffers, availableCpus,
            availableResources, tasks, addressesWithDescs)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }

      if (!launchedAnyTask) {
        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
taskIndex =>
            // If the taskSet is unschedulable we try to find an existing idle 
blacklisted
            // executor. If we cannot find one, we abort immediately. Else we 
kill the idle
            // executor and kick off an abortTimer which if it doesn't schedule 
a task within the
            // the timeout will abort the taskSet if we were unable to schedule 
any task from the
            // taskSet.
            // Note 1: We keep track of schedulability on a per taskSet basis 
rather than on a per
            // task basis.
            // Note 2: The taskSet can still be aborted when there are more 
than one idle
            // blacklisted executors and dynamic allocation is on. This can 
happen when a killed
            // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
            // pending tasks and doesn't kill executors on idle timeouts, 
resulting in the abort
            // timer to expire and abort the taskSet.
            executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
              case Some ((executorId, _)) =>
                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
                  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 
* 1000
                  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
                  logInfo(s"Waiting for $timeout ms for completely "
                    + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
                  abortTimer.schedule(
                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
timeout)
                }
              case None => // Abort Immediately
                logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
                  s" executors can be found to kill. Aborting $taskSet." )
                taskSet.abortSinceCompletelyBlacklisted(taskIndex)
            }
        }
      } else {
        // We want to defer killing any taskSets as long as we have a non 
blacklisted executor
        // which can be used to schedule a task from any active taskSets. This 
ensures that the
        // job can make progress.
        // Note: It is theoretically possible that a taskSet never gets 
scheduled on a
        // non-blacklisted executor and the abort timer doesn't kick in because 
of a constant
        // submission of new TaskSets. See the PR for more details.
        if (unschedulableTaskSetToExpiryTime.nonEmpty) {
          logInfo("Clearing the expiry times for all unschedulable taskSets as 
a task was " +
            "recently scheduled.")
          unschedulableTaskSetToExpiryTime.clear()
        }
      }

      if (launchedAnyTask && taskSet.isBarrier) {
        // Check whether the barrier tasks are partially launched.
        // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
        // requirements are not fulfilled, and we should revert the launched 
tasks).
        if (addressesWithDescs.size != taskSet.numTasks) {
          val errorMsg =
            s"Fail resource offers for barrier stage ${taskSet.stageId} because 
only " +
              s"${addressesWithDescs.size} out of a total number of 
${taskSet.numTasks}" +
              s" tasks got resource offers. This happens because barrier 
execution currently " +
              s"does not work gracefully with delay scheduling. We highly 
recommend you to " +
              s"disable delay scheduling by setting spark.locality.wait=0 as a 
workaround if " +
              s"you see this error frequently."
          logWarning(errorMsg)
          taskSet.abort(errorMsg)
          throw new SparkException(errorMsg)
        }

        // materialize the barrier coordinator.
        maybeInitBarrierCoordinator()

        // Update the taskInfos into all the barrier task properties.
        val addressesStr = addressesWithDescs
          // Addresses ordered by partitionId
          .sortBy(_._2.partitionId)
          .map(_._1)
          .mkString(",")
        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
tasks for barrier " +
          s"stage ${taskSet.stageId}.")
      }
    }
  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
  // launched within a configured time.
  if (tasks.nonEmpty) {
    hasLaunchedTask = true
  }
  return tasks
}


def hasPendingTasksToSched(taskSet: TaskSetManager): Boolean = {
  taskSet.tasks.zipWithIndex
    .exists(t =>
      (!taskSet.successful(t._2) && taskSet.copiesRunning(t._2) == 0)
        || taskSet.speculatableTasks.contains(t._2))
}
{code}
*conclusion*

We managed to reduce the launch time of all receivers to around 50s stablely 
(500 receivers).I think the spark contributors haven't thought a scenario where 
a lot of job are running at the same time which I know is unusual but still  a 
good complement。

  was:
It begins with the needs to  start a lot of spark streaming receivers(custom 
receivers) .  *The launch time gets super long when it comes to more than 300 
receivers.* 

*Tests preparation*

environment: Yarn

spark.executor.cores : 2

spark.executor.instances : fist time 200, then 500

*Tests and data*

At first, we set the number of executors to 200 which means to start 200 
receivers and everything goes well. It takes about 50s to launch all 
receivers.({color:#ff0000}pic 1{color})

Then we set the number of executors to 500 which means to start 500 receivers. 
The launch time became around 5 mins.({color:#ff0000}pic 2{color})

 *Dig into souce code*

I use Thread dump to check which methods takes relatively long 
time.({color:#ff0000}pic 3{color}) Then I add logs between these methods. At 
last I find that the loop in 
{color:#00875a}TaskSchedulerImpl.resourceOffers{color} takes up most percentage 
of the duration({color:#de350b}red color{color}).

{color:#00875a}org.apache.spark.scheduler.TaskSchedulerImpl{color}

 
{code:java}
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] 
= synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  var newExecAvail = false
  for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host) += o.executorId
      executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
      newExecAvail = true
    }
  }
  val hosts = offers.map(_.host).distinct
  for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
  }

  // Before making any offers, remove any nodes from the blacklist whose 
blacklist has expired. Do
  // this here to avoid a separate thread and added synchronization overhead, 
and also because
  // updating the blacklist is only relevant when task offers are being made.
  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
    offers.filter { offer =>
      !blacklistTracker.isNodeBlacklisted(offer.host) &&
        !blacklistTracker.isExecutorBlacklisted(offer.executorId)
    }
  }.getOrElse(offers)

  val shuffledOffers = shuffleOffers(filteredOffers)
  // Build a list of tasks to assign to each worker.
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores 
/ CPUS_PER_TASK))
  val availableResources = shuffledOffers.map(_.resources).toArray
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // Take each TaskSet in our scheduling order, and then offer it each node in 
increasing order
  // of locality levels so that it gets a chance to launch local tasks on all 
of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
  for (taskSet <- sortedTaskSets) {
    // we only need to calculate available slots if using barrier scheduling, 
otherwise the
    // value is -1
    val availableSlots = if (taskSet.isBarrier) {
      val availableResourcesAmount = availableResources.map { resourceMap =>
        // note that the addresses here have been expanded according to the 
numParts
        resourceMap.map { case (name, addresses) => (name, addresses.length) }
      }
      calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
    } else {
      -1
    }
    // Skip the barrier taskSet if the available slots are less than the number 
of pending tasks.
    if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
      // Skip the launch process.
      // TODO SPARK-24819 If the job requires more slots than available (both 
busy and free
      // slots), fail the job on submit.
      logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while 
the total " +
        s"number of available slots is $availableSlots.")
    } else {
      var launchedAnyTask = false
      // Record all the executor IDs assigned barrier tasks on.
      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var launchedTaskAtCurrentMaxLocality = false
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
            currentMaxLocality, shuffledOffers, availableCpus,
            availableResources, tasks, addressesWithDescs)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }

      if (!launchedAnyTask) {
        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
taskIndex =>
            // If the taskSet is unschedulable we try to find an existing idle 
blacklisted
            // executor. If we cannot find one, we abort immediately. Else we 
kill the idle
            // executor and kick off an abortTimer which if it doesn't schedule 
a task within the
            // the timeout will abort the taskSet if we were unable to schedule 
any task from the
            // taskSet.
            // Note 1: We keep track of schedulability on a per taskSet basis 
rather than on a per
            // task basis.
            // Note 2: The taskSet can still be aborted when there are more 
than one idle
            // blacklisted executors and dynamic allocation is on. This can 
happen when a killed
            // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
            // pending tasks and doesn't kill executors on idle timeouts, 
resulting in the abort
            // timer to expire and abort the taskSet.
            executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
              case Some ((executorId, _)) =>
                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
                  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 
* 1000
                  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
                  logInfo(s"Waiting for $timeout ms for completely "
                    + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
                  abortTimer.schedule(
                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
timeout)
                }
              case None => // Abort Immediately
                logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
                  s" executors can be found to kill. Aborting $taskSet." )
                taskSet.abortSinceCompletelyBlacklisted(taskIndex)
            }
        }
      } else {
        // We want to defer killing any taskSets as long as we have a non 
blacklisted executor
        // which can be used to schedule a task from any active taskSets. This 
ensures that the
        // job can make progress.
        // Note: It is theoretically possible that a taskSet never gets 
scheduled on a
        // non-blacklisted executor and the abort timer doesn't kick in because 
of a constant
        // submission of new TaskSets. See the PR for more details.
        if (unschedulableTaskSetToExpiryTime.nonEmpty) {
          logInfo("Clearing the expiry times for all unschedulable taskSets as 
a task was " +
            "recently scheduled.")
          unschedulableTaskSetToExpiryTime.clear()
        }
      }

      if (launchedAnyTask && taskSet.isBarrier) {
        // Check whether the barrier tasks are partially launched.
        // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
        // requirements are not fulfilled, and we should revert the launched 
tasks).
        if (addressesWithDescs.size != taskSet.numTasks) {
          val errorMsg =
            s"Fail resource offers for barrier stage ${taskSet.stageId} because 
only " +
              s"${addressesWithDescs.size} out of a total number of 
${taskSet.numTasks}" +
              s" tasks got resource offers. This happens because barrier 
execution currently " +
              s"does not work gracefully with delay scheduling. We highly 
recommend you to " +
              s"disable delay scheduling by setting spark.locality.wait=0 as a 
workaround if " +
              s"you see this error frequently."
          logWarning(errorMsg)
          taskSet.abort(errorMsg)
          throw new SparkException(errorMsg)
        }

        // materialize the barrier coordinator.
        maybeInitBarrierCoordinator()

        // Update the taskInfos into all the barrier task properties.
        val addressesStr = addressesWithDescs
          // Addresses ordered by partitionId
          .sortBy(_._2.partitionId)
          .map(_._1)
          .mkString(",")
        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
tasks for barrier " +
          s"stage ${taskSet.stageId}.")
      }
    }
  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
  // launched within a configured time.
  if (tasks.nonEmpty) {
    hasLaunchedTask = true
  }
  return tasks
}
{code}
 

*Explaination and Solution*

The loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color} will iterate 
all none-zombie TaskSetManagers in a queue of Pool. Normally the size of this 
queue is not so big because it gets removed when all of its tasks is done. But 
for spark streaming jobs, we all konw receivers will be wrapped as a non-stop 
job ,which means its TaskSetManager will exists  in the queue all the time 
until the application is finished. For example, when it start to launch the 
10th receiver ,the size of the queue is 10 ,so it will iterates 10 times and 
when it starts to launch the 500th receiver, it will iterate 500 times . 
However 499 of the iteration are not necessay ,their task is already on running 
.  

When I digged deep into the code. I find that it decides whether a 
TaskSetManagers still has pending tasks left in {color:#00875a}TaskSetManagers 
.dequeueTaskFromList{color}({color:#ff0000}pic 5{color}) which is far away form 
the loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color}.
{code:java}
private def dequeueTaskFromList(
    execId: String,
    host: String,
    list: ArrayBuffer[Int],
    speculative: Boolean = false): Option[Int] = {
  var indexOffset = list.size
  while (indexOffset > 0) {
    indexOffset -= 1
    val index = list(indexOffset)
    if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
        !(speculative && hasAttemptOnHost(index, host))) {
      // This should almost always be list.trimEnd(1) to remove tail
      list.remove(indexOffset)
      // Speculatable task should only be launched when at most one copy of the
      // original task is running
      if (!successful(index)) {
        if (copiesRunning(index) == 0) {
          return Some(index)
        } else if (speculative && copiesRunning(index) == 1) {
          return Some(index)
        }
      }
    }
  }
  None
}{code}
So I move the pending tasks code ahead to  the loop in 
{color:#00875a}TaskSchedulerImpl.resourceOffers{color}.({color:#ff0000}pic 
6{color}) ,and I also consided the speculation mode.This prevent 
TaskSetManagers of which all tasks are finished from getting into this 
loop,which saves a lot of unnecessay iterations.
{code:java}
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] 
= synchronized {
  // Mark each slave as alive and remember its hostname
  // Also track if new executor is added
  var newExecAvail = false
  for (o <- offers) {
    if (!hostToExecutors.contains(o.host)) {
      hostToExecutors(o.host) = new HashSet[String]()
    }
    if (!executorIdToRunningTaskIds.contains(o.executorId)) {
      hostToExecutors(o.host) += o.executorId
      executorAdded(o.executorId, o.host)
      executorIdToHost(o.executorId) = o.host
      executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
      newExecAvail = true
    }
  }
  val hosts = offers.map(_.host).distinct
  for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
  }

  // Before making any offers, remove any nodes from the blacklist whose 
blacklist has expired. Do
  // this here to avoid a separate thread and added synchronization overhead, 
and also because
  // updating the blacklist is only relevant when task offers are being made.
  blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

  val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
    offers.filter { offer =>
      !blacklistTracker.isNodeBlacklisted(offer.host) &&
        !blacklistTracker.isExecutorBlacklisted(offer.executorId)
    }
  }.getOrElse(offers)

  val shuffledOffers = shuffleOffers(filteredOffers)
  // Build a list of tasks to assign to each worker.
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores 
/ CPUS_PER_TASK))
  val availableResources = shuffledOffers.map(_.resources).toArray
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // Take each TaskSet in our scheduling order, and then offer it each node in 
increasing order
  // of locality levels so that it gets a chance to launch local tasks on all 
of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
  for (taskSet <- sortedTaskSets if hasPendingTasksToSched(taskSet)) {
    // we only need to calculate available slots if using barrier scheduling, 
otherwise the
    // value is -1
    val availableSlots = if (taskSet.isBarrier) {
      val availableResourcesAmount = availableResources.map { resourceMap =>
        // note that the addresses here have been expanded according to the 
numParts
        resourceMap.map { case (name, addresses) => (name, addresses.length) }
      }
      calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
    } else {
      -1
    }
    // Skip the barrier taskSet if the available slots are less than the number 
of pending tasks.
    if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
      // Skip the launch process.
      // TODO SPARK-24819 If the job requires more slots than available (both 
busy and free
      // slots), fail the job on submit.
      logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
        s"because the barrier taskSet requires ${taskSet.numTasks} slots, while 
the total " +
        s"number of available slots is $availableSlots.")
    } else {
      var launchedAnyTask = false
      // Record all the executor IDs assigned barrier tasks on.
      val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
        var launchedTaskAtCurrentMaxLocality = false
        do {
          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
            currentMaxLocality, shuffledOffers, availableCpus,
            availableResources, tasks, addressesWithDescs)
          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
        } while (launchedTaskAtCurrentMaxLocality)
      }

      if (!launchedAnyTask) {
        taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
taskIndex =>
            // If the taskSet is unschedulable we try to find an existing idle 
blacklisted
            // executor. If we cannot find one, we abort immediately. Else we 
kill the idle
            // executor and kick off an abortTimer which if it doesn't schedule 
a task within the
            // the timeout will abort the taskSet if we were unable to schedule 
any task from the
            // taskSet.
            // Note 1: We keep track of schedulability on a per taskSet basis 
rather than on a per
            // task basis.
            // Note 2: The taskSet can still be aborted when there are more 
than one idle
            // blacklisted executors and dynamic allocation is on. This can 
happen when a killed
            // idle executor isn't replaced in time by 
ExecutorAllocationManager as it relies on
            // pending tasks and doesn't kill executors on idle timeouts, 
resulting in the abort
            // timer to expire and abort the taskSet.
            executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
              case Some ((executorId, _)) =>
                if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
                  blacklistTrackerOpt.foreach(blt => 
blt.killBlacklistedIdleExecutor(executorId))

                  val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) 
* 1000
                  unschedulableTaskSetToExpiryTime(taskSet) = 
clock.getTimeMillis() + timeout
                  logInfo(s"Waiting for $timeout ms for completely "
                    + s"blacklisted task to be schedulable again before 
aborting $taskSet.")
                  abortTimer.schedule(
                    createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
timeout)
                }
              case None => // Abort Immediately
                logInfo("Cannot schedule any task because of complete 
blacklisting. No idle" +
                  s" executors can be found to kill. Aborting $taskSet." )
                taskSet.abortSinceCompletelyBlacklisted(taskIndex)
            }
        }
      } else {
        // We want to defer killing any taskSets as long as we have a non 
blacklisted executor
        // which can be used to schedule a task from any active taskSets. This 
ensures that the
        // job can make progress.
        // Note: It is theoretically possible that a taskSet never gets 
scheduled on a
        // non-blacklisted executor and the abort timer doesn't kick in because 
of a constant
        // submission of new TaskSets. See the PR for more details.
        if (unschedulableTaskSetToExpiryTime.nonEmpty) {
          logInfo("Clearing the expiry times for all unschedulable taskSets as 
a task was " +
            "recently scheduled.")
          unschedulableTaskSetToExpiryTime.clear()
        }
      }

      if (launchedAnyTask && taskSet.isBarrier) {
        // Check whether the barrier tasks are partially launched.
        // TODO SPARK-24818 handle the assert failure case (that can happen 
when some locality
        // requirements are not fulfilled, and we should revert the launched 
tasks).
        if (addressesWithDescs.size != taskSet.numTasks) {
          val errorMsg =
            s"Fail resource offers for barrier stage ${taskSet.stageId} because 
only " +
              s"${addressesWithDescs.size} out of a total number of 
${taskSet.numTasks}" +
              s" tasks got resource offers. This happens because barrier 
execution currently " +
              s"does not work gracefully with delay scheduling. We highly 
recommend you to " +
              s"disable delay scheduling by setting spark.locality.wait=0 as a 
workaround if " +
              s"you see this error frequently."
          logWarning(errorMsg)
          taskSet.abort(errorMsg)
          throw new SparkException(errorMsg)
        }

        // materialize the barrier coordinator.
        maybeInitBarrierCoordinator()

        // Update the taskInfos into all the barrier task properties.
        val addressesStr = addressesWithDescs
          // Addresses ordered by partitionId
          .sortBy(_._2.partitionId)
          .map(_._1)
          .mkString(",")
        addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
addressesStr))

        logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
tasks for barrier " +
          s"stage ${taskSet.stageId}.")
      }
    }
  }

  // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
  // launched within a configured time.
  if (tasks.nonEmpty) {
    hasLaunchedTask = true
  }
  return tasks
}


def hasPendingTasksToSched(taskSet: TaskSetManager): Boolean = {
  taskSet.tasks.zipWithIndex
    .exists(t =>
      (!taskSet.successful(t._2) && taskSet.copiesRunning(t._2) == 0)
        || taskSet.speculatableTasks.contains(t._2))
}
{code}
*conclusion*

We managed to reduce the launch time of all receivers to around 50s stablely 
(500 receivers).I think the spark contributors haven't thought a scenario where 
a lot of job are running at the same time which I know is unusual but still  a 
good complement。


> TaskSchedulerImpl: Check pending tasks in advance when resource offers
> ----------------------------------------------------------------------
>
>                 Key: SPARK-33418
>                 URL: https://issues.apache.org/jira/browse/SPARK-33418
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.1
>            Reporter: dingbei
>            Priority: Major
>         Attachments: 1.png, 2.png, 3_1.png, 3_2.png, 3_3.png, 3_4.png, 6_2.png
>
>
> It begins with the needs to  start a lot of spark streaming receivers(custom 
> receivers) .  *The launch time gets super long when it comes to more than 300 
> receivers.* 
> *Tests preparation*
> environment: Yarn
> spark.executor.cores : 2
> spark.executor.instances : fist time 200, then 500
> *Tests and data*
> At first, we set the number of executors to 200 which means to start 200 
> receivers and everything goes well. It takes about 50s to launch all 
> receivers.({color:#ff0000}pic 1{color})
> Then we set the number of executors to 500 which means to start 500 
> receivers. The launch time became around 5 mins.({color:#ff0000}pic 2{color})
>  *Dig into souce code*
> I use Thread dump to check which methods takes relatively long 
> time.({color:#ff0000}pic 3{color}) Then I add logs between these methods. At 
> last I find that the loop in 
> {color:#00875a}TaskSchedulerImpl.resourceOffers{color} takes up most 
> percentage of the duration({color:#de350b}red color{color}).
> {color:#00875a}org.apache.spark.scheduler.TaskSchedulerImpl{color}
>  
> {code:java}
> def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
> Seq[Seq[TaskDescription]] = synchronized {
>   // Mark each slave as alive and remember its hostname
>   // Also track if new executor is added
>   var newExecAvail = false
>   for (o <- offers) {
>     if (!hostToExecutors.contains(o.host)) {
>       hostToExecutors(o.host) = new HashSet[String]()
>     }
>     if (!executorIdToRunningTaskIds.contains(o.executorId)) {
>       hostToExecutors(o.host) += o.executorId
>       executorAdded(o.executorId, o.host)
>       executorIdToHost(o.executorId) = o.host
>       executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
>       newExecAvail = true
>     }
>   }
>   val hosts = offers.map(_.host).distinct
>   for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
>     hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
>   }
>   // Before making any offers, remove any nodes from the blacklist whose 
> blacklist has expired. Do
>   // this here to avoid a separate thread and added synchronization overhead, 
> and also because
>   // updating the blacklist is only relevant when task offers are being made.
>   blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
>   val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
>     offers.filter { offer =>
>       !blacklistTracker.isNodeBlacklisted(offer.host) &&
>         !blacklistTracker.isExecutorBlacklisted(offer.executorId)
>     }
>   }.getOrElse(offers)
>   val shuffledOffers = shuffleOffers(filteredOffers)
>   // Build a list of tasks to assign to each worker.
>   val tasks = shuffledOffers.map(o => new 
> ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
>   val availableResources = shuffledOffers.map(_.resources).toArray
>   val availableCpus = shuffledOffers.map(o => o.cores).toArray
>   val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
>   for (taskSet <- sortedTaskSets) {
>     logDebug("parentName: %s, name: %s, runningTasks: %s".format(
>       taskSet.parent.name, taskSet.name, taskSet.runningTasks))
>     if (newExecAvail) {
>       taskSet.executorAdded()
>     }
>   }
>   // Take each TaskSet in our scheduling order, and then offer it each node 
> in increasing order
>   // of locality levels so that it gets a chance to launch local tasks on all 
> of them.
>   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
> RACK_LOCAL, ANY
>   for (taskSet <- sortedTaskSets) {
>     // we only need to calculate available slots if using barrier scheduling, 
> otherwise the
>     // value is -1
>     val availableSlots = if (taskSet.isBarrier) {
>       val availableResourcesAmount = availableResources.map { resourceMap =>
>         // note that the addresses here have been expanded according to the 
> numParts
>         resourceMap.map { case (name, addresses) => (name, addresses.length) }
>       }
>       calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
>     } else {
>       -1
>     }
>     // Skip the barrier taskSet if the available slots are less than the 
> number of pending tasks.
>     if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
>       // Skip the launch process.
>       // TODO SPARK-24819 If the job requires more slots than available (both 
> busy and free
>       // slots), fail the job on submit.
>       logInfo(s"Skip current round of resource offers for barrier stage 
> ${taskSet.stageId} " +
>         s"because the barrier taskSet requires ${taskSet.numTasks} slots, 
> while the total " +
>         s"number of available slots is $availableSlots.")
>     } else {
>       var launchedAnyTask = false
>       // Record all the executor IDs assigned barrier tasks on.
>       val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
>       for (currentMaxLocality <- taskSet.myLocalityLevels) {
>         var launchedTaskAtCurrentMaxLocality = false
>         do {
>           launchedTaskAtCurrentMaxLocality = 
> resourceOfferSingleTaskSet(taskSet,
>             currentMaxLocality, shuffledOffers, availableCpus,
>             availableResources, tasks, addressesWithDescs)
>           launchedAnyTask |= launchedTaskAtCurrentMaxLocality
>         } while (launchedTaskAtCurrentMaxLocality)
>       }
>       if (!launchedAnyTask) {
>         taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
> taskIndex =>
>             // If the taskSet is unschedulable we try to find an existing 
> idle blacklisted
>             // executor. If we cannot find one, we abort immediately. Else we 
> kill the idle
>             // executor and kick off an abortTimer which if it doesn't 
> schedule a task within the
>             // the timeout will abort the taskSet if we were unable to 
> schedule any task from the
>             // taskSet.
>             // Note 1: We keep track of schedulability on a per taskSet basis 
> rather than on a per
>             // task basis.
>             // Note 2: The taskSet can still be aborted when there are more 
> than one idle
>             // blacklisted executors and dynamic allocation is on. This can 
> happen when a killed
>             // idle executor isn't replaced in time by 
> ExecutorAllocationManager as it relies on
>             // pending tasks and doesn't kill executors on idle timeouts, 
> resulting in the abort
>             // timer to expire and abort the taskSet.
>             executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match 
> {
>               case Some ((executorId, _)) =>
>                 if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
>                   blacklistTrackerOpt.foreach(blt => 
> blt.killBlacklistedIdleExecutor(executorId))
>                   val timeout = 
> conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
>                   unschedulableTaskSetToExpiryTime(taskSet) = 
> clock.getTimeMillis() + timeout
>                   logInfo(s"Waiting for $timeout ms for completely "
>                     + s"blacklisted task to be schedulable again before 
> aborting $taskSet.")
>                   abortTimer.schedule(
>                     createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
> timeout)
>                 }
>               case None => // Abort Immediately
>                 logInfo("Cannot schedule any task because of complete 
> blacklisting. No idle" +
>                   s" executors can be found to kill. Aborting $taskSet." )
>                 taskSet.abortSinceCompletelyBlacklisted(taskIndex)
>             }
>         }
>       } else {
>         // We want to defer killing any taskSets as long as we have a non 
> blacklisted executor
>         // which can be used to schedule a task from any active taskSets. 
> This ensures that the
>         // job can make progress.
>         // Note: It is theoretically possible that a taskSet never gets 
> scheduled on a
>         // non-blacklisted executor and the abort timer doesn't kick in 
> because of a constant
>         // submission of new TaskSets. See the PR for more details.
>         if (unschedulableTaskSetToExpiryTime.nonEmpty) {
>           logInfo("Clearing the expiry times for all unschedulable taskSets 
> as a task was " +
>             "recently scheduled.")
>           unschedulableTaskSetToExpiryTime.clear()
>         }
>       }
>       if (launchedAnyTask && taskSet.isBarrier) {
>         // Check whether the barrier tasks are partially launched.
>         // TODO SPARK-24818 handle the assert failure case (that can happen 
> when some locality
>         // requirements are not fulfilled, and we should revert the launched 
> tasks).
>         if (addressesWithDescs.size != taskSet.numTasks) {
>           val errorMsg =
>             s"Fail resource offers for barrier stage ${taskSet.stageId} 
> because only " +
>               s"${addressesWithDescs.size} out of a total number of 
> ${taskSet.numTasks}" +
>               s" tasks got resource offers. This happens because barrier 
> execution currently " +
>               s"does not work gracefully with delay scheduling. We highly 
> recommend you to " +
>               s"disable delay scheduling by setting spark.locality.wait=0 as 
> a workaround if " +
>               s"you see this error frequently."
>           logWarning(errorMsg)
>           taskSet.abort(errorMsg)
>           throw new SparkException(errorMsg)
>         }
>         // materialize the barrier coordinator.
>         maybeInitBarrierCoordinator()
>         // Update the taskInfos into all the barrier task properties.
>         val addressesStr = addressesWithDescs
>           // Addresses ordered by partitionId
>           .sortBy(_._2.partitionId)
>           .map(_._1)
>           .mkString(",")
>         addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
> addressesStr))
>         logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
> tasks for barrier " +
>           s"stage ${taskSet.stageId}.")
>       }
>     }
>   }
>   // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
> barrier tasks don't get
>   // launched within a configured time.
>   if (tasks.nonEmpty) {
>     hasLaunchedTask = true
>   }
>   return tasks
> }
> {code}
>  
> *Explaination and Solution*
> The loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color} will 
> iterate all none-zombie TaskSetManagers in a queue of Pool. Normally the size 
> of this queue is not so big because it gets removed when all of its tasks is 
> done. But for spark streaming jobs, we all konw receivers will be wrapped as 
> a non-stop job ,which means its TaskSetManager will exists  in the queue all 
> the time until the application is finished. For example, when it start to 
> launch the 10th receiver ,the size of the queue is 10 ,so it will iterates 10 
> times and when it starts to launch the 500th receiver, it will iterate 500 
> times . However 499 of the iteration are not necessay ,their task is already 
> on running .  
> When I digged deep into the code. I find that it decides whether a 
> TaskSetManagers still has pending tasks left in
>  {color:#00875a}TaskSetManagers .dequeueTaskFromList{color} which is far away 
> form the loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color}.
> {code:java}
> private def dequeueTaskFromList(
>     execId: String,
>     host: String,
>     list: ArrayBuffer[Int],
>     speculative: Boolean = false): Option[Int] = {
>   var indexOffset = list.size
>   while (indexOffset > 0) {
>     indexOffset -= 1
>     val index = list(indexOffset)
>     if (!isTaskBlacklistedOnExecOrNode(index, execId, host) &&
>         !(speculative && hasAttemptOnHost(index, host))) {
>       // This should almost always be list.trimEnd(1) to remove tail
>       list.remove(indexOffset)
>       // Speculatable task should only be launched when at most one copy of 
> the
>       // original task is running
>       if (!successful(index)) {
>         if (copiesRunning(index) == 0) {
>           return Some(index)
>         } else if (speculative && copiesRunning(index) == 1) {
>           return Some(index)
>         }
>       }
>     }
>   }
>   None
> }{code}
> So I move the pending tasks code ahead to  the loop in 
> {color:#00875a}TaskSchedulerImpl.resourceOffers{color}. ,and I also consided 
> the speculation mode.This prevent TaskSetManagers of which all tasks are 
> finished from getting into this loop,which saves a lot of unnecessay 
> iterations.
> {code:java}
> def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
> Seq[Seq[TaskDescription]] = synchronized {
>   // Mark each slave as alive and remember its hostname
>   // Also track if new executor is added
>   var newExecAvail = false
>   for (o <- offers) {
>     if (!hostToExecutors.contains(o.host)) {
>       hostToExecutors(o.host) = new HashSet[String]()
>     }
>     if (!executorIdToRunningTaskIds.contains(o.executorId)) {
>       hostToExecutors(o.host) += o.executorId
>       executorAdded(o.executorId, o.host)
>       executorIdToHost(o.executorId) = o.host
>       executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
>       newExecAvail = true
>     }
>   }
>   val hosts = offers.map(_.host).distinct
>   for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) {
>     hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host
>   }
>   // Before making any offers, remove any nodes from the blacklist whose 
> blacklist has expired. Do
>   // this here to avoid a separate thread and added synchronization overhead, 
> and also because
>   // updating the blacklist is only relevant when task offers are being made.
>   blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
>   val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
>     offers.filter { offer =>
>       !blacklistTracker.isNodeBlacklisted(offer.host) &&
>         !blacklistTracker.isExecutorBlacklisted(offer.executorId)
>     }
>   }.getOrElse(offers)
>   val shuffledOffers = shuffleOffers(filteredOffers)
>   // Build a list of tasks to assign to each worker.
>   val tasks = shuffledOffers.map(o => new 
> ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
>   val availableResources = shuffledOffers.map(_.resources).toArray
>   val availableCpus = shuffledOffers.map(o => o.cores).toArray
>   val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
>   for (taskSet <- sortedTaskSets) {
>     logDebug("parentName: %s, name: %s, runningTasks: %s".format(
>       taskSet.parent.name, taskSet.name, taskSet.runningTasks))
>     if (newExecAvail) {
>       taskSet.executorAdded()
>     }
>   }
>   // Take each TaskSet in our scheduling order, and then offer it each node 
> in increasing order
>   // of locality levels so that it gets a chance to launch local tasks on all 
> of them.
>   // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
> RACK_LOCAL, ANY
>   for (taskSet <- sortedTaskSets if hasPendingTasksToSched(taskSet)) {
>     // we only need to calculate available slots if using barrier scheduling, 
> otherwise the
>     // value is -1
>     val availableSlots = if (taskSet.isBarrier) {
>       val availableResourcesAmount = availableResources.map { resourceMap =>
>         // note that the addresses here have been expanded according to the 
> numParts
>         resourceMap.map { case (name, addresses) => (name, addresses.length) }
>       }
>       calculateAvailableSlots(this, availableCpus, availableResourcesAmount)
>     } else {
>       -1
>     }
>     // Skip the barrier taskSet if the available slots are less than the 
> number of pending tasks.
>     if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
>       // Skip the launch process.
>       // TODO SPARK-24819 If the job requires more slots than available (both 
> busy and free
>       // slots), fail the job on submit.
>       logInfo(s"Skip current round of resource offers for barrier stage 
> ${taskSet.stageId} " +
>         s"because the barrier taskSet requires ${taskSet.numTasks} slots, 
> while the total " +
>         s"number of available slots is $availableSlots.")
>     } else {
>       var launchedAnyTask = false
>       // Record all the executor IDs assigned barrier tasks on.
>       val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
>       for (currentMaxLocality <- taskSet.myLocalityLevels) {
>         var launchedTaskAtCurrentMaxLocality = false
>         do {
>           launchedTaskAtCurrentMaxLocality = 
> resourceOfferSingleTaskSet(taskSet,
>             currentMaxLocality, shuffledOffers, availableCpus,
>             availableResources, tasks, addressesWithDescs)
>           launchedAnyTask |= launchedTaskAtCurrentMaxLocality
>         } while (launchedTaskAtCurrentMaxLocality)
>       }
>       if (!launchedAnyTask) {
>         taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { 
> taskIndex =>
>             // If the taskSet is unschedulable we try to find an existing 
> idle blacklisted
>             // executor. If we cannot find one, we abort immediately. Else we 
> kill the idle
>             // executor and kick off an abortTimer which if it doesn't 
> schedule a task within the
>             // the timeout will abort the taskSet if we were unable to 
> schedule any task from the
>             // taskSet.
>             // Note 1: We keep track of schedulability on a per taskSet basis 
> rather than on a per
>             // task basis.
>             // Note 2: The taskSet can still be aborted when there are more 
> than one idle
>             // blacklisted executors and dynamic allocation is on. This can 
> happen when a killed
>             // idle executor isn't replaced in time by 
> ExecutorAllocationManager as it relies on
>             // pending tasks and doesn't kill executors on idle timeouts, 
> resulting in the abort
>             // timer to expire and abort the taskSet.
>             executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match 
> {
>               case Some ((executorId, _)) =>
>                 if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
>                   blacklistTrackerOpt.foreach(blt => 
> blt.killBlacklistedIdleExecutor(executorId))
>                   val timeout = 
> conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
>                   unschedulableTaskSetToExpiryTime(taskSet) = 
> clock.getTimeMillis() + timeout
>                   logInfo(s"Waiting for $timeout ms for completely "
>                     + s"blacklisted task to be schedulable again before 
> aborting $taskSet.")
>                   abortTimer.schedule(
>                     createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), 
> timeout)
>                 }
>               case None => // Abort Immediately
>                 logInfo("Cannot schedule any task because of complete 
> blacklisting. No idle" +
>                   s" executors can be found to kill. Aborting $taskSet." )
>                 taskSet.abortSinceCompletelyBlacklisted(taskIndex)
>             }
>         }
>       } else {
>         // We want to defer killing any taskSets as long as we have a non 
> blacklisted executor
>         // which can be used to schedule a task from any active taskSets. 
> This ensures that the
>         // job can make progress.
>         // Note: It is theoretically possible that a taskSet never gets 
> scheduled on a
>         // non-blacklisted executor and the abort timer doesn't kick in 
> because of a constant
>         // submission of new TaskSets. See the PR for more details.
>         if (unschedulableTaskSetToExpiryTime.nonEmpty) {
>           logInfo("Clearing the expiry times for all unschedulable taskSets 
> as a task was " +
>             "recently scheduled.")
>           unschedulableTaskSetToExpiryTime.clear()
>         }
>       }
>       if (launchedAnyTask && taskSet.isBarrier) {
>         // Check whether the barrier tasks are partially launched.
>         // TODO SPARK-24818 handle the assert failure case (that can happen 
> when some locality
>         // requirements are not fulfilled, and we should revert the launched 
> tasks).
>         if (addressesWithDescs.size != taskSet.numTasks) {
>           val errorMsg =
>             s"Fail resource offers for barrier stage ${taskSet.stageId} 
> because only " +
>               s"${addressesWithDescs.size} out of a total number of 
> ${taskSet.numTasks}" +
>               s" tasks got resource offers. This happens because barrier 
> execution currently " +
>               s"does not work gracefully with delay scheduling. We highly 
> recommend you to " +
>               s"disable delay scheduling by setting spark.locality.wait=0 as 
> a workaround if " +
>               s"you see this error frequently."
>           logWarning(errorMsg)
>           taskSet.abort(errorMsg)
>           throw new SparkException(errorMsg)
>         }
>         // materialize the barrier coordinator.
>         maybeInitBarrierCoordinator()
>         // Update the taskInfos into all the barrier task properties.
>         val addressesStr = addressesWithDescs
>           // Addresses ordered by partitionId
>           .sortBy(_._2.partitionId)
>           .map(_._1)
>           .mkString(",")
>         addressesWithDescs.foreach(_._2.properties.setProperty("addresses", 
> addressesStr))
>         logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} 
> tasks for barrier " +
>           s"stage ${taskSet.stageId}.")
>       }
>     }
>   }
>   // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
> barrier tasks don't get
>   // launched within a configured time.
>   if (tasks.nonEmpty) {
>     hasLaunchedTask = true
>   }
>   return tasks
> }
> def hasPendingTasksToSched(taskSet: TaskSetManager): Boolean = {
>   taskSet.tasks.zipWithIndex
>     .exists(t =>
>       (!taskSet.successful(t._2) && taskSet.copiesRunning(t._2) == 0)
>         || taskSet.speculatableTasks.contains(t._2))
> }
> {code}
> *conclusion*
> We managed to reduce the launch time of all receivers to around 50s stablely 
> (500 receivers).I think the spark contributors haven't thought a scenario 
> where a lot of job are running at the same time which I know is unusual but 
> still  a good complement。



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to