Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18950#discussion_r133505348
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -1214,6 +1214,101 @@ class TaskSetManagerSuite extends SparkFunSuite 
with LocalSparkContext with Logg
         verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
       }
     
    +  test("limit max concurrent running tasks in a job group when configured 
") {
    +    val conf = new SparkConf().
    +      set(config.BLACKLIST_ENABLED, true).
    +      set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
    +
    +    sc = new SparkContext("local", "test", conf)
    +    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
    +    val props = new Properties();
    +    props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
    +
    +    val tasks = Array.tabulate[Task[_]](10) { i =>
    +      new FakeTask(0, i, Nil)
    +    }
    +    val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
    +
    +    // make some offers to our taskset
    +    var taskDescs = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      // offer each executor twice (simulating 2 cores per executor)
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
    +    }
    +    assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
    +
    +    // make 4 more offers
    +    val taskDescs2 = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
    +    }
    +    assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
    +
    +    // inform tsm that one task has completed
    +    val directTaskResult = new DirectTaskResult[String](null, Seq()) {
    --- End diff --
    
    you can use `createTaskResult`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to