Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15644#discussion_r88615462
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -437,62 +438,76 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
           new WorkerOffer("executor3", "host2", 1)
         )
         // We should check the node & exec blacklists, but only O(numOffers), 
not O(numPendingTasks)
    -    // times.  Its O(numOffers), not exactly numOffers, because we offer 
resources multiple
    -    // times.  An upper bound on the worst case is -- we check it once for 
every core at each
    -    // locality level.  (We could tighten the bound a bit but that should 
be a good enough check.)
    +    // times.  In the worst case, after shuffling, we offer our 
blacklisted resource first, and then
    +    // offer other resources which do get used.  The taskset blacklist is 
consulted repeatedly as
    +    // we offer resources to the taskset -- each iteration either 
schedules something, or it
    +    // terminates that locality level, so the maximum number of checks is
    +    // numCores + numLocalityLevels
         val numCoresOnAllOffers = offers.map(_.cores).sum
         val numLocalityLevels = TaskLocality.values.size
    -    val maxBlacklistChecks = numCoresOnAllOffers * numLocalityLevels
    -
    -    // Setup the blacklist, and get back a list of the executors & nodes 
that have any blacklisting
    -    // (even implicit blacklisting).
    +    val maxBlacklistChecks = numCoresOnAllOffers + numLocalityLevels
     
    +    // Setup the blacklist
         nodeBlacklist.foreach { node =>
           
when(stageToMockTaskSetBlacklist(0).isNodeBlacklistedForTaskSet(node)).thenReturn(true)
         }
         execBlacklist.foreach { exec =>
           
when(stageToMockTaskSetBlacklist(0).isExecutorBlacklistedForTaskSet(exec)).thenReturn(true)
         }
    +
    +    // Figure out which nodes have any effective blacklisting on them.  
This means all nodes that
    +    // are explicitly blacklisted, plus those that have *any* executors 
blacklisted.
         val nodesForBlacklistedExecutors = offers.filter { offer =>
           execBlacklist.contains(offer.executorId)
         }.map(_.host).toSet.toSeq
    -    val nodesToCheck = nodeBlacklist ++ nodesForBlacklistedExecutors
    +    val nodesWithAnyBlacklisting = nodeBlacklist ++ 
nodesForBlacklistedExecutors
    +    // Similarly, figure out which executors have any blacklisting.  This 
means all executors that
    +    // are explicitly blacklisted, plus all executors on nodes that are 
blacklisted.
         val execsForBlacklistedNodes = offers.filter { offer =>
           nodeBlacklist.contains(offer.host)
         }.map(_.executorId).toSeq
    -    val executorsToCheck = execBlacklist ++ execsForBlacklistedNodes
    +    val executorsWithAnyBlacklisting = execBlacklist ++ 
execsForBlacklistedNodes
     
    -    // Schedule a taskset, do a bit of basic sanity checking (that our 
test is operating the way its
    -    // supposed to).
    +    // Schedule a taskset, and make sure our test setup is correct -- we 
are able to schedule
    +    // a task on all executors that aren't blacklisted (even the ones 
implicitly blacklisted by the
    +    // node blacklist).
         val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
    -    assert(firstTaskAttempts.size === 3 - executorsToCheck.size)
    -    assert(firstTaskAttempts.size < offers.size)
    +    assert(firstTaskAttempts.size === 3 - 
executorsWithAnyBlacklisting.size)
     
         // Now check that we haven't made too many calls to any of the 
blacklist methods.
    +    // We should be checking our node blacklist, but it should be within 
the bound we defined above.
         verify(stageToMockTaskSetBlacklist(0), atMost(maxBlacklistChecks))
           .isNodeBlacklistedForTaskSet(anyString())
    -    nodesToCheck.foreach { node =>
    +    // We shouldn't ever consult the per-task blacklist for the nodes that 
have been blacklisted
    +    // for the entire taskset, since the taskset level blacklisting should 
prevent scheduling
    +    // from ever looking at specific tasks.
    +    nodesWithAnyBlacklisting.foreach { node =>
           verify(stageToMockTaskSetBlacklist(0), never)
             .isNodeBlacklistedForTask(meq(node), anyInt())
         }
    -    executorsToCheck.foreach { exec =>
    -      // If we had a node-blacklist, then we could tighten the next check 
to *never*.  But, it also
    -      // doesn't particular matter if we the executor check happens in 
addition.
    +    executorsWithAnyBlacklisting.foreach { exec =>
    +      // We should be checking our executor blacklist, but it should be 
within the bound defined
    +      // above.  Its possible that this will be significantly fewer calls, 
maybe even 0, if there
    +      // is also a node-blacklist which takes effect first.  But this 
assert is all we need to
    +      // avoid an O(numPendingTask) slowdown.
    --- End diff --
    
    actually with 3 offers, there are only 6 permutations, so 100 seems like 
overkill.  Maybe 10?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to