[ https://issues.apache.org/jira/browse/SPARK-33418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
dingbei updated SPARK-33418: ---------------------------- Attachment: (was: 6_1.png) > TaskSchedulerImpl: Check pending tasks in advance when resource offers > ---------------------------------------------------------------------- > > Key: SPARK-33418 > URL: https://issues.apache.org/jira/browse/SPARK-33418 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 3.0.1 > Reporter: dingbei > Priority: Major > Attachments: 1.png, 2.png, 3_1.png, 3_2.png, 3_3.png, 3_4.png, 6_2.png > > > It begins with the needs to start a lot of spark streaming receivers(custom > receivers) . *The launch time gets super long when it comes to more than 300 > receivers.* > *Tests preparation* > environment: Yarn > spark.executor.cores : 2 > spark.executor.instances : fist time 200, then 500 > *Tests and data* > At first, we set the number of executors to 200 which means to start 200 > receivers and everything goes well. It takes about 50s to launch all > receivers.({color:#ff0000}pic 1{color}) > Then we set the number of executors to 500 which means to start 500 > receivers. The launch time became around 5 mins.({color:#ff0000}pic 2{color}) > *Dig into souce code* > I use Thread dump to check which methods takes relatively long > time.({color:#ff0000}pic 3{color}) Then I add logs between these methods. At > last I find that the loop in > {color:#00875a}TaskSchedulerImpl.resourceOffers{color} takes up most > percentage of the duration({color:#de350b}red color{color}). > {color:#00875a}org.apache.spark.scheduler.TaskSchedulerImpl{color} > > {code:java} > def resourceOffers(offers: IndexedSeq[WorkerOffer]): > Seq[Seq[TaskDescription]] = synchronized { > // Mark each slave as alive and remember its hostname > // Also track if new executor is added > var newExecAvail = false > for (o <- offers) { > if (!hostToExecutors.contains(o.host)) { > hostToExecutors(o.host) = new HashSet[String]() > } > if (!executorIdToRunningTaskIds.contains(o.executorId)) { > hostToExecutors(o.host) += o.executorId > executorAdded(o.executorId, o.host) > executorIdToHost(o.executorId) = o.host > executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() > newExecAvail = true > } > } > val hosts = offers.map(_.host).distinct > for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) { > hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host > } > // Before making any offers, remove any nodes from the blacklist whose > blacklist has expired. Do > // this here to avoid a separate thread and added synchronization overhead, > and also because > // updating the blacklist is only relevant when task offers are being made. > blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) > val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => > offers.filter { offer => > !blacklistTracker.isNodeBlacklisted(offer.host) && > !blacklistTracker.isExecutorBlacklisted(offer.executorId) > } > }.getOrElse(offers) > val shuffledOffers = shuffleOffers(filteredOffers) > // Build a list of tasks to assign to each worker. > val tasks = shuffledOffers.map(o => new > ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) > val availableResources = shuffledOffers.map(_.resources).toArray > val availableCpus = shuffledOffers.map(o => o.cores).toArray > val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie) > for (taskSet <- sortedTaskSets) { > logDebug("parentName: %s, name: %s, runningTasks: %s".format( > taskSet.parent.name, taskSet.name, taskSet.runningTasks)) > if (newExecAvail) { > taskSet.executorAdded() > } > } > // Take each TaskSet in our scheduling order, and then offer it each node > in increasing order > // of locality levels so that it gets a chance to launch local tasks on all > of them. > // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, > RACK_LOCAL, ANY > for (taskSet <- sortedTaskSets) { > // we only need to calculate available slots if using barrier scheduling, > otherwise the > // value is -1 > val availableSlots = if (taskSet.isBarrier) { > val availableResourcesAmount = availableResources.map { resourceMap => > // note that the addresses here have been expanded according to the > numParts > resourceMap.map { case (name, addresses) => (name, addresses.length) } > } > calculateAvailableSlots(this, availableCpus, availableResourcesAmount) > } else { > -1 > } > // Skip the barrier taskSet if the available slots are less than the > number of pending tasks. > if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { > // Skip the launch process. > // TODO SPARK-24819 If the job requires more slots than available (both > busy and free > // slots), fail the job on submit. > logInfo(s"Skip current round of resource offers for barrier stage > ${taskSet.stageId} " + > s"because the barrier taskSet requires ${taskSet.numTasks} slots, > while the total " + > s"number of available slots is $availableSlots.") > } else { > var launchedAnyTask = false > // Record all the executor IDs assigned barrier tasks on. > val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() > for (currentMaxLocality <- taskSet.myLocalityLevels) { > var launchedTaskAtCurrentMaxLocality = false > do { > launchedTaskAtCurrentMaxLocality = > resourceOfferSingleTaskSet(taskSet, > currentMaxLocality, shuffledOffers, availableCpus, > availableResources, tasks, addressesWithDescs) > launchedAnyTask |= launchedTaskAtCurrentMaxLocality > } while (launchedTaskAtCurrentMaxLocality) > } > if (!launchedAnyTask) { > taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { > taskIndex => > // If the taskSet is unschedulable we try to find an existing > idle blacklisted > // executor. If we cannot find one, we abort immediately. Else we > kill the idle > // executor and kick off an abortTimer which if it doesn't > schedule a task within the > // the timeout will abort the taskSet if we were unable to > schedule any task from the > // taskSet. > // Note 1: We keep track of schedulability on a per taskSet basis > rather than on a per > // task basis. > // Note 2: The taskSet can still be aborted when there are more > than one idle > // blacklisted executors and dynamic allocation is on. This can > happen when a killed > // idle executor isn't replaced in time by > ExecutorAllocationManager as it relies on > // pending tasks and doesn't kill executors on idle timeouts, > resulting in the abort > // timer to expire and abort the taskSet. > executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match > { > case Some ((executorId, _)) => > if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { > blacklistTrackerOpt.foreach(blt => > blt.killBlacklistedIdleExecutor(executorId)) > val timeout = > conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 > unschedulableTaskSetToExpiryTime(taskSet) = > clock.getTimeMillis() + timeout > logInfo(s"Waiting for $timeout ms for completely " > + s"blacklisted task to be schedulable again before > aborting $taskSet.") > abortTimer.schedule( > createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), > timeout) > } > case None => // Abort Immediately > logInfo("Cannot schedule any task because of complete > blacklisting. No idle" + > s" executors can be found to kill. Aborting $taskSet." ) > taskSet.abortSinceCompletelyBlacklisted(taskIndex) > } > } > } else { > // We want to defer killing any taskSets as long as we have a non > blacklisted executor > // which can be used to schedule a task from any active taskSets. > This ensures that the > // job can make progress. > // Note: It is theoretically possible that a taskSet never gets > scheduled on a > // non-blacklisted executor and the abort timer doesn't kick in > because of a constant > // submission of new TaskSets. See the PR for more details. > if (unschedulableTaskSetToExpiryTime.nonEmpty) { > logInfo("Clearing the expiry times for all unschedulable taskSets > as a task was " + > "recently scheduled.") > unschedulableTaskSetToExpiryTime.clear() > } > } > if (launchedAnyTask && taskSet.isBarrier) { > // Check whether the barrier tasks are partially launched. > // TODO SPARK-24818 handle the assert failure case (that can happen > when some locality > // requirements are not fulfilled, and we should revert the launched > tasks). > if (addressesWithDescs.size != taskSet.numTasks) { > val errorMsg = > s"Fail resource offers for barrier stage ${taskSet.stageId} > because only " + > s"${addressesWithDescs.size} out of a total number of > ${taskSet.numTasks}" + > s" tasks got resource offers. This happens because barrier > execution currently " + > s"does not work gracefully with delay scheduling. We highly > recommend you to " + > s"disable delay scheduling by setting spark.locality.wait=0 as > a workaround if " + > s"you see this error frequently." > logWarning(errorMsg) > taskSet.abort(errorMsg) > throw new SparkException(errorMsg) > } > // materialize the barrier coordinator. > maybeInitBarrierCoordinator() > // Update the taskInfos into all the barrier task properties. > val addressesStr = addressesWithDescs > // Addresses ordered by partitionId > .sortBy(_._2.partitionId) > .map(_._1) > .mkString(",") > addressesWithDescs.foreach(_._2.properties.setProperty("addresses", > addressesStr)) > logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} > tasks for barrier " + > s"stage ${taskSet.stageId}.") > } > } > } > // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the > barrier tasks don't get > // launched within a configured time. > if (tasks.nonEmpty) { > hasLaunchedTask = true > } > return tasks > } > {code} > > *Explaination and Solution* > The loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color} will > iterate all none-zombie TaskSetManagers in a queue of Pool. Normally the size > of this queue is not so big because it gets removed when all of its tasks is > done. But for spark streaming jobs, we all konw receivers will be wrapped as > a non-stop job ,which means its TaskSetManager will exists in the queue all > the time until the application is finished. For example, when it start to > launch the 10th receiver ,the size of the queue is 10 ,so it will iterates 10 > times and when it starts to launch the 500th receiver, it will iterate 500 > times . However 499 of the iteration are not necessay ,their task is already > on running . > When I digged deep into the code. I find that it decides whether a > TaskSetManagers still has pending tasks left in > {color:#00875a}TaskSetManagers .dequeueTaskFromList{color} which is far away > form the loop in {color:#00875a}TaskSchedulerImpl.resourceOffers{color}. > {code:java} > private def dequeueTaskFromList( > execId: String, > host: String, > list: ArrayBuffer[Int], > speculative: Boolean = false): Option[Int] = { > var indexOffset = list.size > while (indexOffset > 0) { > indexOffset -= 1 > val index = list(indexOffset) > if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && > !(speculative && hasAttemptOnHost(index, host))) { > // This should almost always be list.trimEnd(1) to remove tail > list.remove(indexOffset) > // Speculatable task should only be launched when at most one copy of > the > // original task is running > if (!successful(index)) { > if (copiesRunning(index) == 0) { > return Some(index) > } else if (speculative && copiesRunning(index) == 1) { > return Some(index) > } > } > } > } > None > }{code} > So I move the pending tasks code ahead to the loop in > {color:#00875a}TaskSchedulerImpl.resourceOffers{color}. ,and I also consided > the speculation mode.This prevent TaskSetManagers of which all tasks are > finished from getting into this loop,which saves a lot of unnecessay > iterations. > {code:java} > def resourceOffers(offers: IndexedSeq[WorkerOffer]): > Seq[Seq[TaskDescription]] = synchronized { > // Mark each slave as alive and remember its hostname > // Also track if new executor is added > var newExecAvail = false > for (o <- offers) { > if (!hostToExecutors.contains(o.host)) { > hostToExecutors(o.host) = new HashSet[String]() > } > if (!executorIdToRunningTaskIds.contains(o.executorId)) { > hostToExecutors(o.host) += o.executorId > executorAdded(o.executorId, o.host) > executorIdToHost(o.executorId) = o.host > executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() > newExecAvail = true > } > } > val hosts = offers.map(_.host).distinct > for ((host, Some(rack)) <- hosts.zip(getRacksForHosts(hosts))) { > hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += host > } > // Before making any offers, remove any nodes from the blacklist whose > blacklist has expired. Do > // this here to avoid a separate thread and added synchronization overhead, > and also because > // updating the blacklist is only relevant when task offers are being made. > blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) > val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => > offers.filter { offer => > !blacklistTracker.isNodeBlacklisted(offer.host) && > !blacklistTracker.isExecutorBlacklisted(offer.executorId) > } > }.getOrElse(offers) > val shuffledOffers = shuffleOffers(filteredOffers) > // Build a list of tasks to assign to each worker. > val tasks = shuffledOffers.map(o => new > ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) > val availableResources = shuffledOffers.map(_.resources).toArray > val availableCpus = shuffledOffers.map(o => o.cores).toArray > val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie) > for (taskSet <- sortedTaskSets) { > logDebug("parentName: %s, name: %s, runningTasks: %s".format( > taskSet.parent.name, taskSet.name, taskSet.runningTasks)) > if (newExecAvail) { > taskSet.executorAdded() > } > } > // Take each TaskSet in our scheduling order, and then offer it each node > in increasing order > // of locality levels so that it gets a chance to launch local tasks on all > of them. > // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, > RACK_LOCAL, ANY > for (taskSet <- sortedTaskSets if hasPendingTasksToSched(taskSet)) { > // we only need to calculate available slots if using barrier scheduling, > otherwise the > // value is -1 > val availableSlots = if (taskSet.isBarrier) { > val availableResourcesAmount = availableResources.map { resourceMap => > // note that the addresses here have been expanded according to the > numParts > resourceMap.map { case (name, addresses) => (name, addresses.length) } > } > calculateAvailableSlots(this, availableCpus, availableResourcesAmount) > } else { > -1 > } > // Skip the barrier taskSet if the available slots are less than the > number of pending tasks. > if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { > // Skip the launch process. > // TODO SPARK-24819 If the job requires more slots than available (both > busy and free > // slots), fail the job on submit. > logInfo(s"Skip current round of resource offers for barrier stage > ${taskSet.stageId} " + > s"because the barrier taskSet requires ${taskSet.numTasks} slots, > while the total " + > s"number of available slots is $availableSlots.") > } else { > var launchedAnyTask = false > // Record all the executor IDs assigned barrier tasks on. > val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() > for (currentMaxLocality <- taskSet.myLocalityLevels) { > var launchedTaskAtCurrentMaxLocality = false > do { > launchedTaskAtCurrentMaxLocality = > resourceOfferSingleTaskSet(taskSet, > currentMaxLocality, shuffledOffers, availableCpus, > availableResources, tasks, addressesWithDescs) > launchedAnyTask |= launchedTaskAtCurrentMaxLocality > } while (launchedTaskAtCurrentMaxLocality) > } > if (!launchedAnyTask) { > taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { > taskIndex => > // If the taskSet is unschedulable we try to find an existing > idle blacklisted > // executor. If we cannot find one, we abort immediately. Else we > kill the idle > // executor and kick off an abortTimer which if it doesn't > schedule a task within the > // the timeout will abort the taskSet if we were unable to > schedule any task from the > // taskSet. > // Note 1: We keep track of schedulability on a per taskSet basis > rather than on a per > // task basis. > // Note 2: The taskSet can still be aborted when there are more > than one idle > // blacklisted executors and dynamic allocation is on. This can > happen when a killed > // idle executor isn't replaced in time by > ExecutorAllocationManager as it relies on > // pending tasks and doesn't kill executors on idle timeouts, > resulting in the abort > // timer to expire and abort the taskSet. > executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match > { > case Some ((executorId, _)) => > if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { > blacklistTrackerOpt.foreach(blt => > blt.killBlacklistedIdleExecutor(executorId)) > val timeout = > conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 > unschedulableTaskSetToExpiryTime(taskSet) = > clock.getTimeMillis() + timeout > logInfo(s"Waiting for $timeout ms for completely " > + s"blacklisted task to be schedulable again before > aborting $taskSet.") > abortTimer.schedule( > createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), > timeout) > } > case None => // Abort Immediately > logInfo("Cannot schedule any task because of complete > blacklisting. No idle" + > s" executors can be found to kill. Aborting $taskSet." ) > taskSet.abortSinceCompletelyBlacklisted(taskIndex) > } > } > } else { > // We want to defer killing any taskSets as long as we have a non > blacklisted executor > // which can be used to schedule a task from any active taskSets. > This ensures that the > // job can make progress. > // Note: It is theoretically possible that a taskSet never gets > scheduled on a > // non-blacklisted executor and the abort timer doesn't kick in > because of a constant > // submission of new TaskSets. See the PR for more details. > if (unschedulableTaskSetToExpiryTime.nonEmpty) { > logInfo("Clearing the expiry times for all unschedulable taskSets > as a task was " + > "recently scheduled.") > unschedulableTaskSetToExpiryTime.clear() > } > } > if (launchedAnyTask && taskSet.isBarrier) { > // Check whether the barrier tasks are partially launched. > // TODO SPARK-24818 handle the assert failure case (that can happen > when some locality > // requirements are not fulfilled, and we should revert the launched > tasks). > if (addressesWithDescs.size != taskSet.numTasks) { > val errorMsg = > s"Fail resource offers for barrier stage ${taskSet.stageId} > because only " + > s"${addressesWithDescs.size} out of a total number of > ${taskSet.numTasks}" + > s" tasks got resource offers. This happens because barrier > execution currently " + > s"does not work gracefully with delay scheduling. We highly > recommend you to " + > s"disable delay scheduling by setting spark.locality.wait=0 as > a workaround if " + > s"you see this error frequently." > logWarning(errorMsg) > taskSet.abort(errorMsg) > throw new SparkException(errorMsg) > } > // materialize the barrier coordinator. > maybeInitBarrierCoordinator() > // Update the taskInfos into all the barrier task properties. > val addressesStr = addressesWithDescs > // Addresses ordered by partitionId > .sortBy(_._2.partitionId) > .map(_._1) > .mkString(",") > addressesWithDescs.foreach(_._2.properties.setProperty("addresses", > addressesStr)) > logInfo(s"Successfully scheduled all the ${addressesWithDescs.size} > tasks for barrier " + > s"stage ${taskSet.stageId}.") > } > } > } > // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the > barrier tasks don't get > // launched within a configured time. > if (tasks.nonEmpty) { > hasLaunchedTask = true > } > return tasks > } > def hasPendingTasksToSched(taskSet: TaskSetManager): Boolean = { > taskSet.tasks.zipWithIndex > .exists(t => > (!taskSet.successful(t._2) && taskSet.copiesRunning(t._2) == 0) > || taskSet.speculatableTasks.contains(t._2)) > } > {code} > *conclusion* > We managed to reduce the launch time of all receivers to around 50s stablely > (500 receivers).I think the spark contributors haven't thought a scenario > where a lot of job are running at the same time which I know is unusual but > still a good complement。 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org