This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e234cd8276a [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in the previous batch e234cd8276a is described below commit e234cd8276a603ab8a191dd078b11c605b22a50c Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Mon Dec 5 01:09:50 2022 -0800 [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in the previous batch ### What changes were proposed in this pull request? This PR aims to prevent `getReusablePVCs` from choosing recently created PVCs in the very previous batch by excluding newly created PVCs whose creation time is within `spark.kubernetes.allocation.batch.delay`. ### Why are the changes needed? In case of slow K8s control plane situation where `spark.kubernetes.allocation.batch.delay` is too short relatively or `spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, `onNewSnapshots` may not return the full list of executor pods created by the previous batch. This sometimes makes Spark driver think the PVCs in the previous batch are reusable for the next batch. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly created test case. Closes #38912 from dongjoon-hyun/SPARK-41388. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../cluster/k8s/ExecutorPodsAllocator.scala | 6 +++++- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 22 ++++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 524ab0c845c..d8ae910b1ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -373,7 +373,11 @@ class ExecutorPodsAllocator( .getItems .asScala - val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + val now = Instant.now().toEpochMilli + val reusablePVCs = createdPVCs + .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) + .filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli + > podAllocationDelay) logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") reusablePVCs } catch { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index caec9ef9201..c526bf0968e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant +import java.time.temporal.ChronoUnit.MILLIS import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -721,8 +722,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .set(s"$prefix.option.sizeLimit", "200Gi") .set(s"$prefix.option.storageClass", "gp2") - when(persistentVolumeClaimList.getItems) - .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava) + val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi") + pvc.getMetadata + .setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, MILLIS).toString) + when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava) when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), meq(kubernetesClient), any(classOf[ResourceProfile]))) .thenAnswer((invocation: InvocationOnMock) => { @@ -791,6 +794,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq.empty[String]) } + test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in the previous batch") { + val getReusablePVCs = + PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs")) + + val pvc1 = persistentVolumeClaim("pvc-0", "gp2", "200Gi") + val pvc2 = persistentVolumeClaim("pvc-1", "gp2", "200Gi") + + val now = Instant.now() + pvc1.getMetadata.setCreationTimestamp(now.minus(2 * podAllocationDelay, MILLIS).toString) + pvc2.getMetadata.setCreationTimestamp(now.toString) + + when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc1, pvc2).asJava) + podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1")) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org