This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 79f133b7bbc [SPARK-39688][K8S] `getReusablePVCs` should handle accounts with no PVC permission 79f133b7bbc is described below commit 79f133b7bbc1d9aa6a20dd8a34ec120902f96155 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Tue Jul 5 13:26:43 2022 -0700 [SPARK-39688][K8S] `getReusablePVCs` should handle accounts with no PVC permission ### What changes were proposed in this pull request? This PR aims to handle `KubernetesClientException` in `getReusablePVCs` method to handle gracefully the cases where accounts has no PVC permission including `listing`. ### Why are the changes needed? To prevent a regression in Apache Spark 3.4. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. Closes #37095 from dongjoon-hyun/SPARK-39688. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../cluster/k8s/ExecutorPodsAllocator.scala | 28 +++++++++++++--------- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 10 +++++++- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 3519efd3fcb..9bdc30e4466 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.util.control.NonFatal import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder} -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ @@ -360,16 +360,22 @@ class ExecutorPodsAllocator( private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = { if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) && driverPod.nonEmpty) { - val createdPVCs = kubernetesClient - .persistentVolumeClaims - .withLabel("spark-app-selector", applicationId) - .list() - .getItems - .asScala - - val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) - logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") - reusablePVCs + try { + val createdPVCs = kubernetesClient + .persistentVolumeClaims + .withLabel("spark-app-selector", applicationId) + .list() + .getItems + .asScala + + val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") + reusablePVCs + } catch { + case _: KubernetesClientException => + logInfo("Cannot list PVC resources. Please check account permissions.") + mutable.Buffer.empty[PersistentVolumeClaim] + } } else { mutable.Buffer.empty[PersistentVolumeClaim] } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 87bd8ef3d9d..7ce0b57d1e9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -20,9 +20,10 @@ import java.time.Instant import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ +import scala.collection.mutable import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException} import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, eq => meq} @@ -762,6 +763,13 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { " namespace default")) } + test("SPARK-39688: getReusablePVCs should handle accounts with no PVC permission") { + val getReusablePVCs = + PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs")) + when(persistentVolumeClaimList.getItems).thenThrow(new KubernetesClientException("Error")) + podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq.empty[String]) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org