This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new eb9b12692601 [SPARK-47663][CORE][TESTS] add end to end test for task limiting according to different cpu and gpu configurations eb9b12692601 is described below commit eb9b126926016e0156b1d40b1d7ed33d4705d2bb Author: Bobby Wang <wbo4...@gmail.com> AuthorDate: Tue Apr 2 15:30:10 2024 +0800 [SPARK-47663][CORE][TESTS] add end to end test for task limiting according to different cpu and gpu configurations ### What changes were proposed in this pull request? Add an end-to-end unit test to ensure that the number of tasks is calculated correctly according to the different task CPU amound and task GPU amount. ### Why are the changes needed? To increase the test coverage. More details can be found at https://github.com/apache/spark/pull/45528#discussion_r1545905575 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The CI can pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45794 from wbo4958/end2end-test. Authored-by: Bobby Wang <wbo4...@gmail.com> Signed-off-by: Weichen Xu <weichen...@databricks.com> --- .../CoarseGrainedSchedulerBackendSuite.scala | 47 ++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 6e94f9abe67b..a75f470deec3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -179,6 +179,53 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo } } + // Every item corresponds to (CPU resources per task, GPU resources per task, + // and the GPU addresses assigned to all tasks). + Seq( + (1, 1, Array(Array("0"), Array("1"), Array("2"), Array("3"))), + (1, 2, Array(Array("0", "1"), Array("2", "3"))), + (1, 4, Array(Array("0", "1", "2", "3"))), + (2, 1, Array(Array("0"), Array("1"))), + (4, 1, Array(Array("0"))), + (4, 2, Array(Array("0", "1"))), + (2, 2, Array(Array("0", "1"), Array("2", "3"))), + (4, 4, Array(Array("0", "1", "2", "3"))), + (1, 3, Array(Array("0", "1", "2"))), + (3, 1, Array(Array("0"))) + ).foreach { case (taskCpus, taskGpus, expectedGpuAddresses) => + test(s"SPARK-47663 end to end test validating if task cpus:${taskCpus} and " + + s"task gpus: ${taskGpus} works") { + withTempDir { dir => + val discoveryScript = createTempScriptWithExpectedOutput( + dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0", "1", "2", "3"]}""") + val conf = new SparkConf() + .set(CPUS_PER_TASK, taskCpus) + .setMaster("local-cluster[1, 4, 1024]") + .setAppName("test") + .set(WORKER_GPU_ID.amountConf, "4") + .set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript) + .set(EXECUTOR_GPU_ID.amountConf, "4") + .set(TASK_GPU_ID.amountConf, taskGpus.toString) + + sc = new SparkContext(conf) + eventually(timeout(executorUpTimeout)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 1) + } + + val numPartitions = Seq(4 / taskCpus, 4 / taskGpus).min + val ret = sc.parallelize(1 to 20, numPartitions).mapPartitions { _ => + val tc = TaskContext.get() + assert(tc.cpus() == taskCpus) + val gpus = tc.resources()("gpu").addresses + Iterator.single(gpus) + }.collect() + + assert(ret === expectedGpuAddresses) + } + } + } + // Here we just have test for one happy case instead of all cases: other cases are covered in // FsHistoryProviderSuite. test("custom log url for Spark UI is applied") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org