This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f36d1bf [SPARK-38423][K8S] Reuse driver pod's `priorityClassName` for `PodGroup` f36d1bf is described below commit f36d1bfba47f6f6ff0f4375a1eb74bb606f8a0b7 Author: Yikun Jiang <yikunk...@gmail.com> AuthorDate: Sun Mar 6 23:54:18 2022 -0800 [SPARK-38423][K8S] Reuse driver pod's `priorityClassName` for `PodGroup` ### What changes were proposed in this pull request? This patch set podgroup `priorityClassName` to `driver.pod.spec.priorityClassName`. ### Why are the changes needed? Support priority scheduling with Volcano implementations ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - New UT to make sure feature step set podgroup priority as expected. - Add two integration tests: - 1. Submit 3 different priority jobs (spark pi) to make sure job completed result as expected. - 2. Submit 3 different priority jobs (driver submisson) to make sure job scheduler order as expected. - All existing UT and IT Closes #35639 from Yikun/SPARK-38189. Authored-by: Yikun Jiang <yikunk...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../deploy/k8s/features/VolcanoFeatureStep.scala | 6 + .../k8s/features/VolcanoFeatureStepSuite.scala | 30 ++++ .../src/test/resources/volcano/disable-queue.yml | 24 +++ .../src/test/resources/volcano/enable-queue.yml | 24 +++ .../volcano/high-priority-driver-template.yml | 26 ++++ .../volcano/low-priority-driver-template.yml | 26 ++++ .../volcano/medium-priority-driver-template.yml | 26 ++++ .../src/test/resources/volcano/priorityClasses.yml | 33 +++++ .../k8s/integrationtest/VolcanoTestsSuite.scala | 163 ++++++++++++++++++--- 9 files changed, 340 insertions(+), 18 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala index c6efe4d..48303c8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala @@ -32,6 +32,7 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup" private lazy val namespace = kubernetesConf.namespace private lazy val queue = kubernetesConf.get(KUBERNETES_JOB_QUEUE) + private var priorityClassName: Option[String] = None override def init(config: KubernetesDriverConf): Unit = { kubernetesConf = config @@ -50,10 +51,15 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon queue.foreach(podGroup.editOrNewSpec().withQueue(_).endSpec()) + priorityClassName.foreach(podGroup.editOrNewSpec().withPriorityClassName(_).endSpec()) + Seq(podGroup.build()) } override def configurePod(pod: SparkPod): SparkPod = { + + priorityClassName = Some(pod.pod.getSpec.getPriorityClassName) + val k8sPodBuilder = new PodBuilder(pod.pod) .editMetadata() .addToAnnotations(POD_GROUP_ANNOTATION, podGroupName) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala index eda1ccc..350df77 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.k8s.features +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} import io.fabric8.volcano.scheduling.v1beta1.PodGroup import org.apache.spark.{SparkConf, SparkFunSuite} @@ -57,4 +58,33 @@ class VolcanoFeatureStepSuite extends SparkFunSuite { val annotations = configuredPod.pod.getMetadata.getAnnotations assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup") } + + test("SPARK-38423: Support priorityClassName") { + // test null priority + val podWithNullPriority = SparkPod.initialPod() + assert(podWithNullPriority.pod.getSpec.getPriorityClassName === null) + verifyPriority(SparkPod.initialPod()) + // test normal priority + val podWithPriority = SparkPod( + new PodBuilder() + .withNewMetadata() + .endMetadata() + .withNewSpec() + .withPriorityClassName("priority") + .endSpec() + .build(), + new ContainerBuilder().build()) + assert(podWithPriority.pod.getSpec.getPriorityClassName === "priority") + verifyPriority(podWithPriority) + } + + private def verifyPriority(pod: SparkPod): Unit = { + val sparkConf = new SparkConf() + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf) + val step = new VolcanoFeatureStep() + step.init(kubernetesConf) + val sparkPod = step.configurePod(pod) + val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup] + assert(podGroup.getSpec.getPriorityClassName === sparkPod.pod.getSpec.getPriorityClassName) + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue.yml new file mode 100644 index 0000000..909102d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/disable-queue.yml @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: scheduling.volcano.sh/v1beta1 +kind: Queue +metadata: + name: queue +spec: + weight: 0 + capability: + cpu: "1" diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue.yml new file mode 100644 index 0000000..e753b8c --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/enable-queue.yml @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: scheduling.volcano.sh/v1beta1 +kind: Queue +metadata: + name: queue +spec: + weight: 1 + capability: + cpu: "1" diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/high-priority-driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/high-priority-driver-template.yml new file mode 100644 index 0000000..a7968bf --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/high-priority-driver-template.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: driver-template-label-value +spec: + priorityClassName: high + containers: + - name: test-driver-container + image: will-be-overwritten diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/low-priority-driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/low-priority-driver-template.yml new file mode 100644 index 0000000..7f04b9e --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/low-priority-driver-template.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: driver-template-label-value +spec: + priorityClassName: low + containers: + - name: test-driver-container + image: will-be-overwritten diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/medium-priority-driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/medium-priority-driver-template.yml new file mode 100644 index 0000000..78d9295 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/medium-priority-driver-template.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: driver-template-label-value +spec: + priorityClassName: medium + containers: + - name: test-driver-container + image: will-be-overwritten diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/priorityClasses.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/priorityClasses.yml new file mode 100644 index 0000000..64e9b0d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/priorityClasses.yml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: high +value: 100 +--- +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: medium +value: 50 +--- +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: low +value: 0 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala index f918381..803a8d3 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.{File, FileInputStream} +import java.time.Instant import java.util.UUID import scala.collection.JavaConverters._ @@ -40,7 +41,8 @@ import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: KubernetesSuite => import VolcanoTestsSuite._ import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag - import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} + import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT, + SPARK_DRIVER_MAIN_CLASS} lazy val volcanoClient: VolcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient]) @@ -95,12 +97,15 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku protected def checkPodGroup( pod: Pod, - queue: Option[String] = None): Unit = { + queue: Option[String] = None, + priorityClassName: Option[String] = None): Unit = { val appId = pod.getMetadata.getLabels.get("spark-app-selector") val podGroupName = s"$appId-podgroup" val podGroup = volcanoClient.podGroups().withName(podGroupName).get() assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName) queue.foreach(q => assert(q === podGroup.getSpec.getQueue)) + priorityClassName.foreach(_ => + assert(pod.getSpec.getPriorityClassName === podGroup.getSpec.getPriorityClassName)) } private def createOrReplaceYAMLResource(yamlPath: String): Unit = { @@ -128,31 +133,73 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku def runJobAndVerify( batchSuffix: String, groupLoc: Option[String] = None, - queue: Option[String] = None): Unit = { + queue: Option[String] = None, + driverTemplate: Option[String] = None, + isDriverJob: Boolean = false): Unit = { val appLoc = s"${appLocator}${batchSuffix}" val podName = s"${driverPodName}-${batchSuffix}" // create new configuration for every job - val conf = createVolcanoSparkConf(podName, appLoc, groupLoc, queue) - runSparkPiAndVerifyCompletion( - driverPodChecker = (driverPod: Pod) => { - checkScheduler(driverPod) - checkAnnotaion(driverPod) - checkPodGroup(driverPod, queue) - }, - executorPodChecker = (executorPod: Pod) => { - checkScheduler(executorPod) - checkAnnotaion(executorPod) - }, - customSparkConf = Option(conf), - customAppLocator = Option(appLoc) - ) + val conf = createVolcanoSparkConf(podName, appLoc, groupLoc, queue, driverTemplate) + if (isDriverJob) { + runSparkDriverSubmissionAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + checkScheduler(driverPod) + checkAnnotaion(driverPod) + checkPodGroup(driverPod, queue) + }, + customSparkConf = Option(conf), + customAppLocator = Option(appLoc) + ) + } else { + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + checkScheduler(driverPod) + checkAnnotaion(driverPod) + checkPodGroup(driverPod, queue) + }, + executorPodChecker = (executorPod: Pod) => { + checkScheduler(executorPod) + checkAnnotaion(executorPod) + }, + customSparkConf = Option(conf), + customAppLocator = Option(appLoc) + ) + } + } + + protected def runSparkDriverSubmissionAndVerifyCompletion( + appResource: String = containerLocalSparkDistroExamplesJar, + mainClass: String = SPARK_DRIVER_MAIN_CLASS, + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + appArgs: Array[String] = Array("2"), + customSparkConf: Option[SparkAppConf] = None, + customAppLocator: Option[String] = None): Unit = { + val appArguments = SparkAppArguments( + mainAppResource = appResource, + mainClass = mainClass, + appArgs = appArgs) + SparkAppLauncher.launch( + appArguments, + customSparkConf.getOrElse(sparkAppConf), + TIMEOUT.value.toSeconds.toInt, + sparkHomeDir, + true) + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", customAppLocator.getOrElse(appLocator)) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + driverPodChecker(driverPod) } private def createVolcanoSparkConf( driverPodName: String = driverPodName, appLoc: String = appLocator, groupLoc: Option[String] = None, - queue: Option[String] = None): SparkAppConf = { + queue: Option[String] = None, + driverTemplate: Option[String] = None): SparkAppConf = { val conf = kubernetesTestComponents.newSparkAppConf() .set(CONTAINER_IMAGE.key, image) .set(KUBERNETES_DRIVER_POD_NAME.key, driverPodName) @@ -168,6 +215,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku conf.set(s"${KUBERNETES_DRIVER_LABEL_PREFIX}spark-group-locator", locator) conf.set(s"${KUBERNETES_EXECUTOR_LABEL_PREFIX}spark-group-locator", locator) } + driverTemplate.foreach(conf.set(KUBERNETES_DRIVER_PODTEMPLATE_FILE.key, _)) conf } @@ -229,6 +277,77 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku assert(completedPods.size === jobNum) } } + + test("SPARK-38423: Run SparkPi Jobs with priorityClassName", k8sTestTag, volcanoTag) { + // Prepare the priority resource + createOrReplaceYAMLResource(VOLCANO_PRIORITY_YAML) + val priorities = Seq("low", "medium", "high") + val groupName = generateGroupName("priority") + priorities.foreach { p => + Future { + val templatePath = new File( + getClass.getResource(s"/volcano/$p-priority-driver-template.yml").getFile + ).getAbsolutePath + runJobAndVerify( + p, groupLoc = Option(groupName), + driverTemplate = Option(templatePath) + ) + } + } + // Make sure all jobs are Succeeded + Eventually.eventually(TIMEOUT, INTERVAL) { + val pods = getPods(role = "driver", groupName, statusPhase = "Succeeded") + assert(pods.size === priorities.size) + } + } + + test("SPARK-38423: Run driver job to validate priority order", k8sTestTag, volcanoTag) { + // Prepare the priority resource and queue + createOrReplaceYAMLResource(DISABLE_QUEUE) + createOrReplaceYAMLResource(VOLCANO_PRIORITY_YAML) + // Submit 3 jobs with different priority + val priorities = Seq("low", "medium", "high") + priorities.foreach { p => + Future { + val templatePath = new File( + getClass.getResource(s"/volcano/$p-priority-driver-template.yml").getFile + ).getAbsolutePath + val groupName = generateGroupName(p) + runJobAndVerify( + p, groupLoc = Option(groupName), + queue = Option("queue"), + driverTemplate = Option(templatePath), + isDriverJob = true + ) + } + } + // Make sure 3 jobs are pending + Eventually.eventually(TIMEOUT, INTERVAL) { + priorities.foreach { p => + val pods = getPods(role = "driver", s"$GROUP_PREFIX$p", statusPhase = "Pending") + assert(pods.size === 1) + } + } + + // Enable queue to let jobs running one by one + createOrReplaceYAMLResource(ENABLE_QUEUE) + + // Verify scheduling order follow the specified priority + Eventually.eventually(TIMEOUT, INTERVAL) { + var m = Map.empty[String, Instant] + priorities.foreach { p => + val pods = getPods(role = "driver", s"$GROUP_PREFIX$p", statusPhase = "Succeeded") + assert(pods.size === 1) + val conditions = pods.head.getStatus.getConditions.asScala + val scheduledTime + = conditions.filter(_.getType === "PodScheduled").head.getLastTransitionTime + m += (p -> Instant.parse(scheduledTime)) + } + // high --> medium --> low + assert(m("high").isBefore(m("medium"))) + assert(m("medium").isBefore(m("low"))) + } + } } private[spark] object VolcanoTestsSuite extends SparkFunSuite { @@ -240,4 +359,12 @@ private[spark] object VolcanoTestsSuite extends SparkFunSuite { getClass.getResource("/volcano/disable-queue0-enable-queue1.yml").getFile ).getAbsolutePath val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-" + val VOLCANO_PRIORITY_YAML + = new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath + val ENABLE_QUEUE = new File( + getClass.getResource("/volcano/enable-queue.yml").getFile + ).getAbsolutePath + val DISABLE_QUEUE = new File( + getClass.getResource("/volcano/disable-queue.yml").getFile + ).getAbsolutePath } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org