This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e234cd8276a [SPARK-41388][K8S] `getReusablePVCs` should ignore 
recently created PVCs in the previous batch
e234cd8276a is described below

commit e234cd8276a603ab8a191dd078b11c605b22a50c
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Dec 5 01:09:50 2022 -0800

    [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in 
the previous batch
    
    ### What changes were proposed in this pull request?
    
    This PR aims to prevent `getReusablePVCs` from choosing recently created 
PVCs in the very previous batch by excluding newly created PVCs whose creation 
time is within `spark.kubernetes.allocation.batch.delay`.
    
    ### Why are the changes needed?
    
    In case of slow K8s control plane situation where 
`spark.kubernetes.allocation.batch.delay` is too short relatively or 
`spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, 
`onNewSnapshots` may not return the full list of executor pods created by the 
previous batch. This sometimes makes Spark driver think the PVCs in the 
previous batch are reusable for the next batch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly created test case.
    
    Closes #38912 from dongjoon-hyun/SPARK-41388.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  6 +++++-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 22 ++++++++++++++++++++--
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 524ab0c845c..d8ae910b1ae 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -373,7 +373,11 @@ class ExecutorPodsAllocator(
           .getItems
           .asScala
 
-        val reusablePVCs = createdPVCs.filterNot(pvc => 
pvcsInUse.contains(pvc.getMetadata.getName))
+        val now = Instant.now().toEpochMilli
+        val reusablePVCs = createdPVCs
+          .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
+          .filter(pvc => now - 
Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
+            > podAllocationDelay)
         logInfo(s"Found ${reusablePVCs.size} reusable PVCs from 
${createdPVCs.size} PVCs")
         reusablePVCs
       } catch {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index caec9ef9201..c526bf0968e 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.time.Instant
+import java.time.temporal.ChronoUnit.MILLIS
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
@@ -721,8 +722,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
       .set(s"$prefix.option.sizeLimit", "200Gi")
       .set(s"$prefix.option.storageClass", "gp2")
 
-    when(persistentVolumeClaimList.getItems)
-      .thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
+    val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
+    pvc.getMetadata
+      .setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, 
MILLIS).toString)
+    when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
     
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr),
         meq(kubernetesClient), any(classOf[ResourceProfile])))
       .thenAnswer((invocation: InvocationOnMock) => {
@@ -791,6 +794,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", 
Seq.empty[String])
   }
 
+  test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in 
the previous batch") {
+    val getReusablePVCs =
+      
PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs"))
+
+    val pvc1 = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
+    val pvc2 = persistentVolumeClaim("pvc-1", "gp2", "200Gi")
+
+    val now = Instant.now()
+    pvc1.getMetadata.setCreationTimestamp(now.minus(2 * podAllocationDelay, 
MILLIS).toString)
+    pvc2.getMetadata.setCreationTimestamp(now.toString)
+
+    when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc1, pvc2).asJava)
+    podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1"))
+  }
+
   private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
     (invocation: InvocationOnMock) => {
       val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to