This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 99e503c  [SPARK-29263][SCHEDULER] Update `availableSlots` in 
`resourceOffers()` before checking available slots for barrier taskSet
99e503c is described below

commit 99e503cebfd9cb19372c88b0dd70c6743f864454
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Sep 27 11:18:32 2019 -0700

    [SPARK-29263][SCHEDULER] Update `availableSlots` in `resourceOffers()` 
before checking available slots for barrier taskSet
    
    ### What changes were proposed in this pull request?
    
    availableSlots are computed before the for loop looping over all TaskSets 
in resourceOffers. But the number of slots changes in every iteration, as in 
every iteration these slots are taken. The number of available slots checked by 
a barrier task set has therefore to be recomputed in every iteration from 
availableCpus.
    
    ### Why are the changes needed?
    
    Bugfix.
    This could make resourceOffer attempt to start a barrier task set, even 
though it has not enough slots available. That would then be caught by the 
`require` in line 519, which will throw an exception, which will get caught and 
ignored by Dispatcher's MessageLoop, so nothing terrible would happen, but the 
exception would prevent resourceOffers from considering further TaskSets.
    Note that launching the barrier TaskSet can still fail if other 
requirements are not satisfied, and still can be rolled-back by throwing 
exception in this `require`. Handling it more gracefully remains a TODO in 
SPARK-24818, but this fix at least should resolve the situation when it's 
unable to launch because of insufficient slots.
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added UT
    
    Closes #23375
    
    Closes #25946 from juliuszsompolski/SPARK-29263.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
    (cherry picked from commit 420abb457df0f422f73bab19a6ed6d7c6bab3173)
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../org/apache/spark/scheduler/FakeTask.scala      | 36 +++++++++++----
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 51 ++++++++++++++++------
 3 files changed, 65 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index e194b79..38dbbe7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -391,7 +391,6 @@ private[spark] class TaskSchedulerImpl(
     // Build a list of tasks to assign to each worker.
     val tasks = shuffledOffers.map(o => new 
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
-    val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum
     val sortedTaskSets = rootPool.getSortedTaskSetQueue
     for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -405,6 +404,7 @@ private[spark] class TaskSchedulerImpl(
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
     for (taskSet <- sortedTaskSets) {
+      val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
       // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
       if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
         // Skip the launch process.
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala 
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index b29d32f..abc8841 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -42,15 +42,23 @@ object FakeTask {
    * locations for each task (given as varargs) if this sequence is not empty.
    */
   def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
-    createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*)
+    createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, 
prefLocs: _*)
   }
 
-  def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: 
Seq[TaskLocation]*): TaskSet = {
-    createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*)
+  def createTaskSet(
+      numTasks: Int,
+      stageId: Int,
+      stageAttemptId: Int,
+      prefLocs: Seq[TaskLocation]*): TaskSet = {
+    createTaskSet(numTasks, stageId, stageAttemptId, priority = 0, prefLocs: 
_*)
   }
 
-  def createTaskSet(numTasks: Int, stageId: Int, stageAttemptId: Int, 
prefLocs: Seq[TaskLocation]*):
-  TaskSet = {
+  def createTaskSet(
+      numTasks: Int,
+      stageId: Int,
+      stageAttemptId: Int,
+      priority: Int,
+      prefLocs: Seq[TaskLocation]*): TaskSet = {
     if (prefLocs.size != 0 && prefLocs.size != numTasks) {
       throw new IllegalArgumentException("Wrong number of task locations")
     }
@@ -65,6 +73,15 @@ object FakeTask {
       stageId: Int,
       stageAttemptId: Int,
       prefLocs: Seq[TaskLocation]*): TaskSet = {
+    createShuffleMapTaskSet(numTasks, stageId, stageAttemptId, priority = 0, 
prefLocs: _*)
+  }
+
+  def createShuffleMapTaskSet(
+      numTasks: Int,
+      stageId: Int,
+      stageAttemptId: Int,
+      priority: Int,
+      prefLocs: Seq[TaskLocation]*): TaskSet = {
     if (prefLocs.size != 0 && prefLocs.size != numTasks) {
       throw new IllegalArgumentException("Wrong number of task locations")
     }
@@ -74,17 +91,18 @@ object FakeTask {
       }, prefLocs(i), new Properties,
         
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
     }
-    new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
   }
 
   def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): 
TaskSet = {
-    createBarrierTaskSet(numTasks, stageId = 0, stageAttempId = 0, prefLocs: 
_*)
+    createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 
0, prefLocs: _*)
   }
 
   def createBarrierTaskSet(
       numTasks: Int,
       stageId: Int,
-      stageAttempId: Int,
+      stageAttemptId: Int,
+      priority: Int,
       prefLocs: Seq[TaskLocation]*): TaskSet = {
     if (prefLocs.size != 0 && prefLocs.size != numTasks) {
       throw new IllegalArgumentException("Wrong number of task locations")
@@ -92,6 +110,6 @@ object FakeTask {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
       new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, 
isBarrier = true)
     }
-    new TaskSet(tasks, stageId, stageAttempId, priority = 0, null)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 8b60f8a..5c0601eb03 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -214,19 +214,19 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       taskScheduler.taskSetManagerForAttempt(taskset.stageId, 
taskset.stageAttemptId).get.isZombie
     }
 
-    val attempt1 = FakeTask.createTaskSet(1, 0)
+    val attempt1 = FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 0)
     taskScheduler.submitTasks(attempt1)
     // The first submitted taskset is active
     assert(!isTasksetZombie(attempt1))
 
-    val attempt2 = FakeTask.createTaskSet(1, 1)
+    val attempt2 = FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 1)
     taskScheduler.submitTasks(attempt2)
     // The first submitted taskset is zombie now
     assert(isTasksetZombie(attempt1))
     // The newly submitted taskset is active
     assert(!isTasksetZombie(attempt2))
 
-    val attempt3 = FakeTask.createTaskSet(1, 2)
+    val attempt3 = FakeTask.createTaskSet(1, stageId = 0, stageAttemptId = 2)
     taskScheduler.submitTasks(attempt3)
     // The first submitted taskset remains zombie
     assert(isTasksetZombie(attempt1))
@@ -241,7 +241,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
     val numFreeCores = 1
     val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 
numFreeCores))
-    val attempt1 = FakeTask.createTaskSet(10)
+    val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 0)
 
     // submit attempt 1, offer some resources, some tasks get scheduled
     taskScheduler.submitTasks(attempt1)
@@ -257,7 +257,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(0 === taskDescriptions2.length)
 
     // if we schedule another attempt for the same stage, it should get 
scheduled
-    val attempt2 = FakeTask.createTaskSet(10, 1)
+    val attempt2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1)
 
     // submit attempt 2, offer some resources, some tasks get scheduled
     taskScheduler.submitTasks(attempt2)
@@ -273,7 +273,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
     val numFreeCores = 10
     val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 
numFreeCores))
-    val attempt1 = FakeTask.createTaskSet(10)
+    val attempt1 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 0)
 
     // submit attempt 1, offer some resources, some tasks get scheduled
     taskScheduler.submitTasks(attempt1)
@@ -289,7 +289,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(0 === taskDescriptions2.length)
 
     // submit attempt 2
-    val attempt2 = FakeTask.createTaskSet(10, 1)
+    val attempt2 = FakeTask.createTaskSet(10, stageId = 0, stageAttemptId = 1)
     taskScheduler.submitTasks(attempt2)
 
     // attempt 1 finished (this can happen even if it was marked zombie 
earlier -- all tasks were
@@ -483,7 +483,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("abort stage when all executors are blacklisted and we cannot acquire 
new executor") {
     taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
-    val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0)
+    val taskSet = FakeTask.createTaskSet(numTasks = 10)
     taskScheduler.submitTasks(taskSet)
     val tsm = stageToMockTaskSetManager(0)
 
@@ -525,7 +525,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0")
 
     // We have only 1 task remaining with 1 executor
-    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+    val taskSet = FakeTask.createTaskSet(numTasks = 1)
     taskScheduler.submitTasks(taskSet)
     val tsm = stageToMockTaskSetManager(0)
 
@@ -561,7 +561,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "10")
 
     // We have only 1 task remaining with 1 executor
-    val taskSet = FakeTask.createTaskSet(numTasks = 1, stageAttemptId = 0)
+    val taskSet = FakeTask.createTaskSet(numTasks = 1)
     taskScheduler.submitTasks(taskSet)
     val tsm = stageToMockTaskSetManager(0)
 
@@ -907,7 +907,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
     val taskScheduler = setupScheduler()
 
-    taskScheduler.submitTasks(FakeTask.createTaskSet(2, 0,
+    taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, 
stageAttemptId = 0,
       (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _*
     ))
 
@@ -945,7 +945,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
   test("scheduler checks for executors that can be expired from blacklist") {
     taskScheduler = setupScheduler()
 
-    taskScheduler.submitTasks(FakeTask.createTaskSet(1, 0))
+    taskScheduler.submitTasks(FakeTask.createTaskSet(1, stageId = 0, 
stageAttemptId = 0))
     taskScheduler.resourceOffers(IndexedSeq(
       new WorkerOffer("executor0", "host0", 1)
     )).flatten
@@ -1251,6 +1251,29 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(3 === taskDescriptions.length)
   }
 
+  test("SPARK-29263: barrier TaskSet can't schedule when higher prio taskset 
takes the slots") {
+    val taskCpus = 2
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]",
+      config.CPUS_PER_TASK.key -> taskCpus.toString)
+
+    val numFreeCores = 3
+    val workerOffers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625")),
+      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627")),
+      new WorkerOffer("executor2", "host2", numFreeCores, 
Some("192.168.0.101:49629")))
+    val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId 
= 0, priority = 1)
+    val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0, 
priority = 0)
+
+    // submit highPrio and barrier taskSet
+    taskScheduler.submitTasks(highPrio)
+    taskScheduler.submitTasks(barrier)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    // it schedules the highPrio task first, and then will not have enough 
slots to schedule
+    // the barrier taskset
+    assert(1 === taskDescriptions.length)
+  }
+
   test("cancelTasks shall kill all the running tasks and fail the stage") {
     val taskScheduler = setupScheduler()
 
@@ -1266,7 +1289,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       }
     })
 
-    val attempt1 = FakeTask.createTaskSet(10, 0)
+    val attempt1 = FakeTask.createTaskSet(10)
     taskScheduler.submitTasks(attempt1)
 
     val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
@@ -1297,7 +1320,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       }
     })
 
-    val attempt1 = FakeTask.createTaskSet(10, 0)
+    val attempt1 = FakeTask.createTaskSet(10)
     taskScheduler.submitTasks(attempt1)
 
     val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to