Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139157280 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val props = new Properties(); + props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + + val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) + } + val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + + // make some offers to our taskset + var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + + // make 4 more offers + val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + + // inform tsm that one task has completed + val directTaskResult = createTaskResult(0) + tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + + // make 4 more offers after previous task completed + taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" + ).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + } + assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { + val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + + sc = new SparkContext("local", "test", conf) + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + + val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) + } + val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + + // make 5 offers to our taskset + var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" + ).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) --- End diff -- I think you are doing 3 core per executor here
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org