This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b77b31  [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets 
even if no task was launched
8b77b31 is described below

commit 8b77b3183576da9c85d3939f5759e011c93d4fc5
Author: Nicholas Marcott <481161+bmarc...@users.noreply.github.com>
AuthorDate: Wed Apr 22 08:25:24 2020 -0500

    [SPARK-18886][CORE][FOLLOWUP] allow follow up locality resets even if no 
task was launched
    
    ### What changes were proposed in this pull request?
    Remove the requirement to launch a task in order to reset locality wait 
timer.
    
    ### Why are the changes needed?
    Recently https://github.com/apache/spark/pull/27207 was merged, but 
contained a bug which leads to undesirable behavior.
    
    The crux of the issue is that single resource offers couldn't reset the 
timer, if there had been a previous reject followed by an allResourceOffer with 
no available resources.
    This lead to a problem where once locality level reached ANY, single 
resource offers are all accepted, leading allResourceOffers to be left with no 
resources to utilize (hence no task being launched on an all resource offer -> 
no timer reset). The task manager would be stuck in ANY locality level.
    
    Noting down here the downsides of using below reset conditions, in case we 
want to follow up.
    As this is quite complex, I could easily be missing something, so please 
comment/respond if you have more bad behavior scenarios or find something wrong 
here:
    The format is:
    
    > **Reset condition**
    >  - the unwanted side effect
    >      - the cause/use case
    
    Below references to locality increase/decrease mean:
    ```
    PROCESS_LOCAL, NODE_LOCAL ... .. ANY
        ------ locality decrease --->
       <----- locality increase -----
    ```
    
    **Task launch:**
    - locality decrease:
       - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can 
minimize tasks launched
     - locality increase:
       - single task launch decreases locality despite many tasks remaining
    
    **No delay schedule reject since last allFreeResource offer**
    - locality decrease:
       - locality wait less than allFreeResource offer frequency, which occurs 
at least 1 per second
    - locality increase:
       - single resource (or none) not rejected despite many tasks remaining 
(other lower priority tasks utilizing resources)
    
    **Current impl - No delay schedule reject since last (allFreeResource offer 
+ task launch)**
    - locality decrease:
      - all from above
    - locality increase:
       - single resource accepted and task launched despite many tasks remaining
    
    The current impl is an improvement on the legacy (task launch) in that 
unintended locality decrease case is similar and the unintended locality 
increase case only occurs when the cluster is fully utilized.
    
    For the locality increase cases, perhaps a config which specifies a certain 
% of tasks in a taskset to finish before resetting locality levels would be 
helpful.
    
    **If** that was considered a good approach then perhaps removing the task 
launch as a requirement would eliminate most of downsides listed above.
    Lemme know if you have more ideas for eliminating locality increase 
downside of **No delay schedule reject since last allFreeResource offer**
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    TaskSchedulerImplSuite
    
    Also manually tested similar to how I tested in 
https://github.com/apache/spark/pull/27207 using [this simple 
app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).
    
    With the new changes, given locality wait of 10s the behavior is generally:
    10 seconds of locality being respected, followed by a single full 
utilization of resources using ANY locality level, followed by 10 seconds of 
locality being respected, and so on
    
    If the legacy flag is enabled 
(spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only 
scheduling PROCESS_LOCAL tasks (only utilizing a single executor)
    
    cloud-fan
    tgravescs
    
    Closes #28188 from bmarcott/nmarcott-locality-fix.
    
    Authored-by: Nicholas Marcott <481161+bmarc...@users.noreply.github.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 16 ++---
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 69 +++++++++++++++++++++-
 2 files changed, 75 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2e3e1cc..d327099 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -111,8 +111,9 @@ private[spark] class TaskSchedulerImpl(
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
 
   // keyed by taskset
-  // value is true if the task set's locality wait timer was reset on the last 
resource offer
-  private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]()
+  // value is true if the task set has not rejected any resources due to 
locality
+  // since the timer was last reset
+  private val noRejectsSinceLastReset = new mutable.HashMap[TaskSet, Boolean]()
   private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)
 
   // Protected by `this`
@@ -336,7 +337,7 @@ private[spark] class TaskSchedulerImpl(
         taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
       }
     }
-    resetOnPreviousOffer -= manager.taskSet
+    noRejectsSinceLastReset -= manager.taskSet
     manager.parent.removeSchedulable(manager)
     logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all 
completed, from pool" +
       s" ${manager.parent.name}")
@@ -615,13 +616,14 @@ private[spark] class TaskSchedulerImpl(
         }
 
         if (!legacyLocalityWaitReset) {
-          if (noDelaySchedulingRejects && launchedAnyTask) {
-            if (isAllFreeResources || 
resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) {
+          if (noDelaySchedulingRejects) {
+            if (launchedAnyTask &&
+              (isAllFreeResources || 
noRejectsSinceLastReset.getOrElse(taskSet.taskSet, true))) {
               taskSet.resetDelayScheduleTimer(globalMinLocality)
-              resetOnPreviousOffer.update(taskSet.taskSet, true)
+              noRejectsSinceLastReset.update(taskSet.taskSet, true)
             }
           } else {
-            resetOnPreviousOffer.update(taskSet.taskSet, false)
+            noRejectsSinceLastReset.update(taskSet.taskSet, false)
           }
         }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 056c342..a8541cb 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -296,6 +296,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       .flatten.isEmpty)
   }
 
+  test("SPARK-18886 - task set with no locality requirements should not starve 
one with them") {
+    val clock = new ManualClock()
+    // All tasks created here are local to exec1, host1.
+    // Locality level starts at PROCESS_LOCAL.
+    val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
+    // Locality levels increase at 3000 ms.
+    val advanceAmount = 2000
+
+    val taskSet2 = FakeTask.createTaskSet(8, 2, 0)
+    taskScheduler.submitTasks(taskSet2)
+
+    // Stage 2 takes resource since it has no locality requirements
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten
+      .headOption
+      .map(_.name)
+      .getOrElse("")
+      .contains("stage 2.0"))
+
+    // Clock advances to 2s. No locality changes yet.
+    clock.advance(advanceAmount)
+
+    // Stage 2 takes resource since it has no locality requirements
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten
+      .headOption
+      .map(_.name)
+      .getOrElse("")
+      .contains("stage 2.0"))
+
+    // Simulates:
+    // 1. stage 2 has taken all resource offers through single resource offers
+    // 2. stage 1 is offered 0 cpus on allResourceOffer.
+    // This should not reset timer.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 0)),
+        isAllFreeResources = true)
+      .flatten.length === 0)
+
+    // This should move stage 1 to NODE_LOCAL.
+    clock.advance(advanceAmount)
+
+    // Stage 1 should now accept NODE_LOCAL resource.
+    assert(taskScheduler
+      .resourceOffers(
+        IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+        isAllFreeResources = false)
+      .flatten
+      .headOption
+      .map(_.name)
+      .getOrElse("")
+      .contains("stage 1.1"))
+  }
+
   test("SPARK-18886 - partial resource offers (isAllFreeResources = false) 
reset " +
     "time if last full resource offer (isAllResources = true) was accepted as 
well as any " +
     "following partial resource offers") {
@@ -306,12 +367,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     // Locality levels increase at 3000 ms.
     val advanceAmount = 3000
 
-    // PROCESS_LOCAL full resource offer is accepted.
+    // PROCESS_LOCAL full resource offer is not rejected due to locality.
+    // It has 0 available cores, so no task is launched.
+    // Timer is reset and locality level remains at PROCESS_LOCAL.
     assert(taskScheduler
       .resourceOffers(
-        IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+        IndexedSeq(WorkerOffer("exec1", "host1", 0)),
         isAllFreeResources = true)
-      .flatten.length === 1)
+      .flatten.length === 0)
 
     // Advancing clock increases locality level to NODE_LOCAL.
     clock.advance(advanceAmount)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to