This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 821997bec37 [SPARK-41253][K8S][TESTS] Make Spark K8S volcano IT work in Github Action 821997bec37 is described below commit 821997bec3703ec52db9b1deb667e11e76296c48 Author: Yikun Jiang <yikunk...@gmail.com> AuthorDate: Fri Dec 2 22:44:50 2022 -0800 [SPARK-41253][K8S][TESTS] Make Spark K8S volcano IT work in Github Action ### What changes were proposed in this pull request? This patch makes Spark K8s volcano IT can be ran in Github Action resource limited env. It will help downstream community like volcano to enable spark IT test in github action. BTW, there is no plan to enable volcano test in Spark community, this patch only make test work but **DO NOT** enable the volcano test in Apache Spark GA, it will help downstream test. - Change parallel job number from 4 to 2. (Only 1 job in each queue) if in github action env. - Get specified `spark.kubernetes.[driver|executor].request.cores` - Set queue limit according specified [driver|executor].request.cores just like we done in normal test: https://github.com/apache/spark/commit/883a481e44a1f91ef3fc3aea2838a598cbd6cf0f ### Why are the changes needed? It helps downstream communitys who want to use free github action hosted resources to enable spark IT test in github action. ### Does this PR introduce _any_ user-facing change? No, test only. ### How was this patch tested? - Test on my local env with enough resource (default): ``` $ build/sbt -Pvolcano -Pkubernetes -Pkubernetes-integration-tests -Dtest.include.tags=volcano "kubernetes-integration-tests/test" [info] KubernetesSuite: [info] VolcanoSuite: [info] - Run SparkPi with volcano scheduler (10 seconds, 410 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minCPU (25 seconds, 489 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minMemory (25 seconds, 518 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (14 seconds, 349 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (23 seconds, 516 milliseconds) [info] - SPARK-38423: Run driver job to validate priority order (16 seconds, 404 milliseconds) [info] YuniKornSuite: [info] Run completed in 2 minutes, 34 seconds. [info] Total number of tests run: 6 [info] Suites: completed 3, aborted 0 [info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 439 s (07:19), completed 2022-12-3 8:58:50 ``` - Test on Github Action with `volcanoMaxConcurrencyJobNum`: https://github.com/Yikun/spark/pull/192 ``` $ build/sbt -Pvolcano -Psparkr -Pkubernetes -Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 -Dspark.kubernetes.test.executorRequestCores=0.2 -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.include.tags=volcano "kubernetes-integration-tests/test" [info] VolcanoSuite: [info] - Run SparkPi with volcano scheduler (18 seconds, 122 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minCPU (53 seconds, 964 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minMemory (54 seconds, 523 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (22 seconds, 185 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (33 seconds, 349 milliseconds) [info] - SPARK-38423: Run driver job to validate priority order (32 seconds, 435 milliseconds) [info] YuniKornSuite: [info] Run completed in 4 minutes, 16 seconds. [info] Total number of tests run: 6 [info] Suites: completed 3, aborted 0 [info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [warn] In the last 494 seconds, 7.296 (1.5%) were spent in GC. [Heap: 3.12GB free of 3.83GB, max 3.83GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance. [success] Total time: 924 s (15:24), completed Dec 3, 2022 12:49:42 AM ``` - CI passed Closes #38789 from Yikun/SPARK-41253. Authored-by: Yikun Jiang <yikunk...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 72d58d5f8a847bac53cf01b137780c7e4e2664d7) Signed-off-by: Yikun Jiang <yikunk...@gmail.com> --- .../kubernetes/integration-tests/README.md | 8 ++++ .../volcano/driver-podgroup-template-cpu-2u.yml | 23 ---------- .../deploy/k8s/integrationtest/TestConstants.scala | 2 + .../k8s/integrationtest/VolcanoTestsSuite.scala | 52 +++++++++++++++++----- 4 files changed, 51 insertions(+), 34 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md index af0b1ec3dc7..21e6467724c 100644 --- a/resource-managers/kubernetes/integration-tests/README.md +++ b/resource-managers/kubernetes/integration-tests/README.md @@ -284,6 +284,14 @@ to the wrapper scripts and using the wrapper scripts will simply set these appro </td> <td></td> </tr> + <tr> + <td><code>spark.kubernetes.test.volcanoMaxConcurrencyJobNum</code></td> + <td> + Set maximum number for concurrency jobs, It helps developers setting suitable resources according to test env in + volcano test. + </td> + <td></td> + </tr> </table> # Running the Kubernetes Integration Tests with SBT diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml deleted file mode 100644 index 4a784f0f864..00000000000 --- a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -apiVersion: scheduling.volcano.sh/v1beta1 -kind: PodGroup -spec: - queue: queue-2u - minMember: 1 - minResources: - cpu: "2" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala index 2175d23d449..bf001666c2e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala @@ -36,4 +36,6 @@ object TestConstants { val CONFIG_KEY_UNPACK_DIR = "spark.kubernetes.test.unpackSparkDir" val CONFIG_DRIVER_REQUEST_CORES = "spark.kubernetes.test.driverRequestCores" val CONFIG_EXECUTOR_REQUEST_CORES = "spark.kubernetes.test.executorRequestCores" + + val CONFIG_KEY_VOLCANO_MAX_JOB_NUM = "spark.kubernetes.test.volcanoMaxConcurrencyJobNum" } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala index c1b637523dd..f37a7644a94 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep +import org.apache.spark.deploy.k8s.integrationtest.TestConstants.{CONFIG_DRIVER_REQUEST_CORES, CONFIG_EXECUTOR_REQUEST_CORES, CONFIG_KEY_VOLCANO_MAX_JOB_NUM} import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: KubernetesSuite => @@ -51,6 +52,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku private val testGroups: mutable.Set[String] = mutable.Set.empty private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty + private val driverCores = java.lang.Double.parseDouble(DRIVER_REQUEST_CORES) + private val executorCores = java.lang.Double.parseDouble(EXECUTOR_REQUEST_CORES) + private val maxConcurrencyJobNum = VOLCANO_MAX_JOB_NUM.toInt private def deletePodInTestGroup(): Unit = { testGroups.foreach { g => @@ -252,6 +256,12 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku .set(KUBERNETES_SCHEDULER_NAME.key, "volcano") .set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP) .set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP) + sys.props.get(CONFIG_DRIVER_REQUEST_CORES).foreach { cpu => + conf.set("spark.kubernetes.driver.request.cores", cpu) + } + sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).foreach { cpu => + conf.set("spark.kubernetes.executor.request.cores", cpu) + } queue.foreach { q => conf.set(VolcanoFeatureStep.POD_GROUP_TEMPLATE_FILE_KEY, new File( @@ -303,8 +313,22 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku test("SPARK-38187: Run SparkPi Jobs with minCPU", k8sTestTag, volcanoTag) { val groupName = generateGroupName("min-cpu") - // Create a queue with 2 CPU capacity - createOrReplaceQueue(name = "queue-2u", cpu = Some("2")) + // Create a queue with driver + executor CPU capacity + val jobCores = driverCores + executorCores + val queueName = s"queue-$jobCores" + createOrReplaceQueue(name = queueName, cpu = Some(s"$jobCores")) + val testContent = + s""" + |apiVersion: scheduling.volcano.sh/v1beta1 + |kind: PodGroup + |spec: + | queue: $queueName + | minMember: 1 + | minResources: + | cpu: $jobCores + |""".stripMargin + val file = Utils.createTempFile(testContent, TEMP_DIR) + val path = TEMP_DIR + file // Submit 3 jobs with minCPU = 2 val jobNum = 3 (1 to jobNum).map { i => @@ -312,7 +336,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku runJobAndVerify( i.toString, groupLoc = Option(groupName), - driverPodGroupTemplate = Option(DRIVER_PG_TEMPLATE_CPU_2U)) + driverPodGroupTemplate = Option(path)) } } verifyJobsSucceededOneByOne(jobNum, groupName) @@ -339,8 +363,10 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku // Disabled queue0 and enabled queue1 createOrReplaceQueue(name = "queue0", cpu = Some("0.001")) createOrReplaceQueue(name = "queue1") + val QUEUE_NUMBER = 2 // Submit jobs into disabled queue0 and enabled queue1 - val jobNum = 4 + // By default is 4 (2 jobs in each queue) + val jobNum = maxConcurrencyJobNum * QUEUE_NUMBER (1 to jobNum).foreach { i => Future { val queueName = s"queue${i % 2}" @@ -351,9 +377,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku // There are two `Succeeded` jobs and two `Pending` jobs Eventually.eventually(TIMEOUT, INTERVAL) { val completedPods = getPods("driver", s"${GROUP_PREFIX}queue1", "Succeeded") - assert(completedPods.size === 2) + assert(completedPods.size === jobNum/2) val pendingPods = getPods("driver", s"${GROUP_PREFIX}queue0", "Pending") - assert(pendingPods.size === 2) + assert(pendingPods.size === jobNum/2) } } @@ -362,7 +388,10 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku // Enable all queues createOrReplaceQueue(name = "queue1") createOrReplaceQueue(name = "queue0") - val jobNum = 4 + val QUEUE_NUMBER = 2 + // Submit jobs into disabled queue0 and enabled queue1 + // By default is 4 (2 jobs in each queue) + val jobNum = maxConcurrencyJobNum * QUEUE_NUMBER // Submit jobs into these two queues (1 to jobNum).foreach { i => Future { @@ -410,7 +439,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku } // Enable queue to let jobs running one by one - createOrReplaceQueue(name = "queue", cpu = Some("1")) + createOrReplaceQueue(name = "queue", cpu = Some(s"$driverCores")) // Verify scheduling order follow the specified priority Eventually.eventually(TIMEOUT, INTERVAL) { @@ -435,10 +464,11 @@ private[spark] object VolcanoTestsSuite extends SparkFunSuite { val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-" val VOLCANO_PRIORITY_YAML = new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath - val DRIVER_PG_TEMPLATE_CPU_2U = new File( - getClass.getResource("/volcano/driver-podgroup-template-cpu-2u.yml").getFile - ).getAbsolutePath val DRIVER_PG_TEMPLATE_MEMORY_3G = new File( getClass.getResource("/volcano/driver-podgroup-template-memory-3g.yml").getFile ).getAbsolutePath + val DRIVER_REQUEST_CORES = sys.props.get(CONFIG_DRIVER_REQUEST_CORES).getOrElse("1") + val EXECUTOR_REQUEST_CORES = sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).getOrElse("1") + val VOLCANO_MAX_JOB_NUM = sys.props.get(CONFIG_KEY_VOLCANO_MAX_JOB_NUM).getOrElse("2") + val TEMP_DIR = "/tmp/" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org