This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b2dc38b  [SPARK-34334][K8S] Correctly identify timed out pending pod 
requests as excess request
b2dc38b is described below

commit b2dc38b6546552cf3fcfdcd466d7d04d9aa3078c
Author: “attilapiros” <piros.attila.zs...@gmail.com>
AuthorDate: Tue Feb 9 10:06:55 2021 -0800

    [SPARK-34334][K8S] Correctly identify timed out pending pod requests as 
excess request
    
    ### What changes were proposed in this pull request?
    
    Fixing identification of timed-out pending pod requests as excess requests 
to delete when the excess is higher than the newly created timed out requests 
and there is some non-timed out newly created requests too.
    
    ### Why are the changes needed?
    
    After https://github.com/apache/spark/pull/29981 only timed out newly 
created requests and timed out pending requests are taken as excess request.
    
    But there is small bug when the excess is higher than the newly created 
timed out requests and there is some non-timed out newly created requests as 
well. Because all the newly created requests are counted as excess request when 
items are chosen from the timed out pod pending requests.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
     There is new unit test added: `SPARK-34334: correctly identify timed out 
pending pod requests as excess`.
    
    Closes #31445 from attilapiros/SPARK-34334.
    
    Authored-by: “attilapiros” <piros.attila.zs...@gmail.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 11 +++---
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 45 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index f4cd2d0..eb35de8 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -223,14 +223,15 @@ private[spark] class ExecutorPodsAllocator(
 
       if (knownPodCount > targetNum) {
         val excess = knownPodCount - targetNum
+        val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
+          .filter { case (_, (_, createTime)) =>
+            currentTime - createTime > executorIdleTimeout
+          }.keys.take(excess).toList
         val knownPendingToDelete = currentPendingExecutors
           .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
+          .take(excess - newlyCreatedToDelete.size)
           .map { case (id, _) => id }
-          .take(excess - newlyCreatedExecutorsForRpId.size)
-        val toDelete = newlyCreatedExecutorsForRpId
-          .filter { case (_, (_, createTime)) =>
-            currentTime - createTime > executorIdleTimeout
-          }.keys.take(excess).toList ++ knownPendingToDelete
+        val toDelete = newlyCreatedToDelete ++ knownPendingToDelete
 
         if (toDelete.nonEmpty) {
           logInfo(s"Deleting ${toDelete.size} excess pod requests 
(${toDelete.mkString(",")}).")
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index d4d8980..eaf5fd8 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -216,6 +216,51 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(!podsAllocatorUnderTest.isDeleted("4"))
   }
 
+  test("SPARK-34334: correctly identify timed out pending pod requests as 
excess") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    verify(podOperations).create(podWithAttachedContainerForId(1))
+    verify(podOperations).create(any())
+
+    snapshotsStore.updatePod(pendingExecutor(1))
+    snapshotsStore.notifySubscribers()
+
+    waitForExecutorPodsClock.advance(executorIdleTimeout)
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations).create(podWithAttachedContainerForId(2))
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    snapshotsStore.notifySubscribers()
+
+    verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+    verify(podOperations, never()).delete()
+
+    waitForExecutorPodsClock.advance(executorIdleTimeout)
+    snapshotsStore.notifySubscribers()
+
+    // before SPARK-34334 this verify() call failed as the non-timed out newly 
created request
+    // decreased the number of requests taken from timed out pending pod 
requests
+    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+    verify(podOperations).delete()
+  }
+
   test("SPARK-33099: Respect executor idle timeout configuration") {
     when(podOperations
       .withField("status.phase", "Pending"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to