viirya commented on code in PR #55642: URL: https://github.com/apache/spark/pull/55642#discussion_r3172258448
########## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala: ########## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.{Map => JMap} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimBuilder, Pod, Quantity} +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.base.PatchContext +import io.fabric8.kubernetes.client.dsl.base.PatchType + +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.{CURRENT_DISK_SIZE, ORIGINAL_DISK_SIZE, PVC_METADATA_NAME} +import org.apache.spark.util.ThreadUtils + +/** + * Spark plugin to monitor executor PVC disk usage and grow the PVC storage request + * when the usage exceeds a configurable threshold. + * + * Executors measure their own local-directory usage (via DiskBlockManager) and report + * it to the driver through the plugin RPC channel. The driver maps each reported + * mount path back to the executor pod's PVC and patches the PVC's + * `spec.resources.requests.storage` to grow it. The underlying StorageClass must + * have `allowVolumeExpansion: true`. + */ +class ExecutorPVCResizePlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new ExecutorPVCResizeDriverPlugin() + + override def executorPlugin(): ExecutorPlugin = new ExecutorPVCResizeExecutorPlugin() +} + +/** + * Message sent from each executor to the driver with the maximum filesystem usage + * ratio (used / total) across the executor's SPARK_LOCAL_DIRS. The driver applies + * this ratio to every PVC mounted by the reporting executor's pod. + */ +private[k8s] case class PVCDiskUsageReport( + executorId: String, + ratio: Double) + +class ExecutorPVCResizeDriverPlugin extends DriverPlugin with Logging { + private var sparkContext: SparkContext = _ + private var namespace: String = _ + private var threshold: Double = _ + private var factor: Double = _ + + private val latestReports = new ConcurrentHashMap[String, PVCDiskUsageReport]() + private val failedPvcs = ConcurrentHashMap.newKeySet[String]() + + private val periodicService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-plugin") + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val interval = sc.conf.get(PVC_RESIZE_INTERVAL) + if (interval <= 0) { + logInfo("PVCResizePlugin disabled (interval <= 0).") + return Map.empty[String, String].asJava + } + threshold = sc.conf.get(PVC_RESIZE_THRESHOLD) + factor = sc.conf.get(PVC_RESIZE_FACTOR) + namespace = sc.conf.get(KUBERNETES_NAMESPACE) + sparkContext = sc + + periodicService.scheduleAtFixedRate(() => if (!sparkContext.isStopped) { + try { + checkAndResizePVCs() + } catch { + case e: Throwable => logError("Error in PVC resize thread", e) + } + }, interval, interval, TimeUnit.MINUTES) + logInfo("ExecutorPVCResizeDriverPlugin is scheduled") + + // Propagate the interval to executors so they report at the same cadence. + Map(PVC_RESIZE_INTERVAL.key -> interval.toString).asJava + } + + override def receive(message: Any): AnyRef = message match { + case r: PVCDiskUsageReport => + latestReports.put(r.executorId, r) + null + case _ => + null + } + + override def shutdown(): Unit = { + periodicService.shutdown() + } + + private[k8s] def checkAndResizePVCs(): Unit = { + logInfo(s"Latest PVC usage reports: $latestReports") + val appId = sparkContext.applicationId + + sparkContext.schedulerBackend match { + case b: KubernetesClusterSchedulerBackend => + val client = b.kubernetesClient + val pods = client.pods() + .inNamespace(namespace) + .withLabel(SPARK_APP_ID_LABEL, appId) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .list() + .getItems.asScala + + val podByExecId = pods.flatMap { p => + Option(p.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)).map(_ -> p) + }.toMap + + latestReports.values().asScala.foreach { report => + podByExecId.get(report.executorId).foreach { pod => + pvcsOf(pod).foreach { pvcName => + if (!failedPvcs.contains(pvcName)) { + tryResize(client, pvcName, report.ratio, report.executorId) + } + } + } + } + case _ => + logWarning("Skipping PVC resize: schedulerBackend is not " + + "KubernetesClusterSchedulerBackend.") + } + } + + private[k8s] def pvcsOf(pod: Pod): Set[String] = { + val volNameToPvc = pod.getSpec.getVolumes.asScala + .filter(_.getPersistentVolumeClaim != null) + .map(v => v.getName -> v.getPersistentVolumeClaim.getClaimName) + .toMap + pod.getSpec.getContainers.asScala + .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME) + .orElse(pod.getSpec.getContainers.asScala.headOption) + .toSeq + .flatMap(_.getVolumeMounts.asScala) + .flatMap(m => volNameToPvc.get(m.getName)) + .toSet + } + + private def tryResize( Review Comment: Thanks, is 6-hour behavior guaranteed by a Kubernetes control-plane? That sounds like an AWS EBS backend limitation rather than a generic Kubernetes PVC expansion policy? I only found that it is mentioned in AWS EBS article like [this](https://aws.amazon.com/blogs/storage/simplifying-amazon-ebs-volume-migration-and-modification-using-the-ebs-csi-driver/). From the Kubernetes side, users request expansion by updating the PVC spec to a larger requested size, and the controller reconciles that desired state asynchronously. The Kubernetes [docs](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#recovering-from-failure-when-expanding-volumes) also say failed expansion requests are continuously retried and should be monitored via PVC status/events. I don't see a generic Kubernetes rule that rejects subsequent larger PVC spec updates while a previous resize is pending. So even if EBS rejects actual backend ModifyVolume calls within 6 hours, the Spark driver may still keep increasing the PVC desired size in Kubernetes, for example from 50Gi to 100Gi to 200Gi, before the first resize has converged. That changes the target state and can result in over-expansion once the backend is eventually able to process it, or leave the PVC stuck retrying a much larger requested size. I think the plugin should be storage-provider-neutral and should not rely on AWS EBS cooldown behavior for safety. A simple guard such as skipping when `spec.resources.requests.storage` is already greater than `status.capacity["storage"]`, or otherwise tracking an in-flight resize per PVC, would prevent repeated target inflation while still following Kubernetes' desired-state model. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
