Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21758#discussion_r204912925
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -359,20 +368,56 @@ private[spark] class TaskSchedulerImpl(
         // of locality levels so that it gets a chance to launch local tasks 
on all of them.
         // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
         for (taskSet <- sortedTaskSets) {
    -      var launchedAnyTask = false
    -      var launchedTaskAtCurrentMaxLocality = false
    -      for (currentMaxLocality <- taskSet.myLocalityLevels) {
    -        do {
    -          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
    -            taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
    -          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    -        } while (launchedTaskAtCurrentMaxLocality)
    -      }
    -      if (!launchedAnyTask) {
    -        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +      // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
    +      if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
    +        // Skip the launch process.
    +        // TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
    +        // slots), fail the job on submit.
    +        logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
    +          s"because the barrier taskSet requires ${taskSet.numTasks} 
slots, while the total " +
    +          s"number of available slots is ${availableSlots}.")
    +      } else {
    +        var launchedAnyTask = false
    +        var launchedTaskAtCurrentMaxLocality = false
    +        // Record all the executor IDs assigned barrier tasks on.
    +        val addresses = ArrayBuffer[String]()
    +        val taskDescs = ArrayBuffer[TaskDescription]()
    +        for (currentMaxLocality <- taskSet.myLocalityLevels) {
    +          do {
    +            launchedTaskAtCurrentMaxLocality = 
resourceOfferSingleTaskSet(taskSet,
    +              currentMaxLocality, shuffledOffers, availableCpus, tasks, 
addresses, taskDescs)
    +            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    +          } while (launchedTaskAtCurrentMaxLocality)
    +        }
    +        if (!launchedAnyTask) {
    +          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    +        }
    +        if (launchedAnyTask && taskSet.isBarrier) {
    +          // Check whether the barrier tasks are partially launched.
    +          // TODO SPARK-24818 handle the assert failure case (that can 
happen when some locality
    +          // requirements are not fulfilled, and we should revert the 
launched tasks).
    +          require(taskDescs.size == taskSet.numTasks,
    +            s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
    +              s"because only ${taskDescs.size} out of a total number of 
${taskSet.numTasks} " +
    +              "tasks got resource offers. The resource offers may have 
been blacklisted or " +
    +              "cannot fulfill task locality requirements.")
    +
    +          // Update the taskInfos into all the barrier task properties.
    +          val addressesStr = addresses.zip(taskDescs)
    +            // Addresses ordered by partitionId
    +            .sortBy(_._2.partitionId)
    +            .map(_._1)
    +            .mkString(",")
    +          taskDescs.foreach(_.properties.setProperty("addresses", 
addressesStr))
    +
    +          logInfo(s"Successfully scheduled all the ${taskDescs.size} tasks 
for barrier stage " +
    +            s"${taskSet.stageId}.")
    +        }
           }
         }
     
    +    // TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the 
barrier tasks don't get
    +    // launched within a configured time.
    --- End diff --
    
    with concurrently executing jobs, one job could easily cause starvation for 
the barrier job, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to