This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50641d2  [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning 
time & allow decommissioning for excludes
50641d2 is described below

commit 50641d2e3d659f51432aa2c0e6b9af76d71a5796
Author: Holden Karau <hka...@apple.com>
AuthorDate: Tue Feb 9 14:21:24 2021 -0800

    [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow 
decommissioning for excludes
    
    ### What changes were proposed in this pull request?
    
    Allow users to have Spark attempt to decommission excluded executors.
    Since excluded executors may be flaky, this also adds the ability for users 
to specify a time limit after which a decommissioning executor will be killed 
by Spark.
    
    ### Why are the changes needed?
    
    This may help prevent fetch failures from excluded executors, and also 
handle the situation in which executors
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, two new configuration flags for the behaviour.
    
    ### How was this patch tested?
    
    Extended unit and integration tests.
    
    Closes #31249 from 
holdenk/configure-inaccessibleList-kill-to-use-decommissioning.
    
    Lead-authored-by: Holden Karau <hka...@apple.com>
    Co-authored-by: Holden Karau <hol...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../apache/spark/ExecutorAllocationClient.scala    |  6 +++
 .../org/apache/spark/internal/config/package.scala | 19 +++++++-
 .../org/apache/spark/scheduler/HealthTracker.scala | 35 +++++++++++---
 .../cluster/CoarseGrainedClusterMessage.scala      |  3 ++
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 56 ++++++++++++++++++++--
 .../spark/scheduler/HealthTrackerSuite.scala       | 45 +++++++++++++++++
 .../k8s/integrationtest/DecommissionSuite.scala    | 32 +++++++++++++
 .../k8s/integrationtest/KubernetesSuite.scala      |  5 +-
 8 files changed, 187 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index cdba1c4..5b587d7 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -129,6 +129,12 @@ private[spark] trait ExecutorAllocationClient {
     decommissionedExecutors.nonEmpty && 
decommissionedExecutors(0).equals(executorId)
   }
 
+  /**
+   * Request that the cluster manager decommission every executor on the 
specified host.
+   *
+   * @return whether the request is acknowledged by the cluster manager.
+   */
+  def decommissionExecutorsOnHost(host: String): Boolean
 
   /**
    * Request that the cluster manager kill every executor on the specified 
host.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 7aeb51d..3101bb6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -827,6 +827,13 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
+    ConfigBuilder("spark.excludeOnFailure.killExcludedExecutors.decommission")
+      .doc("Attempt decommission of excluded nodes instead of going directly 
to kill")
+      .version("3.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EXCLUDE_ON_FAILURE_LEGACY_TIMEOUT_CONF =
     ConfigBuilder("spark.scheduler.executorTaskExcludeOnFailureTime")
       .internal()
@@ -1958,7 +1965,8 @@ package object config {
 
   private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
     ConfigBuilder("spark.executor.decommission.killInterval")
-      .doc("Duration after which a decommissioned executor will be killed 
forcefully." +
+      .doc("Duration after which a decommissioned executor will be killed 
forcefully " +
+        "*by an outside* (e.g. non-spark) service. " +
         "This config is useful for cloud environments where we know in advance 
when " +
         "an executor is going to go down after decommissioning signal i.e. 
around 2 mins " +
         "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is 
currently " +
@@ -1967,6 +1975,15 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT =
+    ConfigBuilder("spark.executor.decommission.forceKillTimeout")
+      .doc("Duration after which a Spark will force a decommissioning executor 
to exit." +
+        " this should be set to a high value in most situations as low values 
will prevent " +
+        " block migrations from having enough time to complete.")
+      .version("3.2.0")
+      .timeConf(TimeUnit.SECONDS)
+      .createOptional
+
   private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
     ConfigBuilder("spark.executor.decommission.signal")
       .doc("The signal that used to trigger the executor to start 
decommission.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
index c6b8dca..6bd5668 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
@@ -40,6 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
  *      stage, but still many failures over the entire application
  *  * "flaky" executors -- they don't fail every task, but are still faulty 
enough to merit
  *      excluding
+ *  * missing shuffle files -- may trigger fetch failures on healthy executors.
  *
  * See the design doc on SPARK-8425 for a more in-depth discussion. Note 
SPARK-32037 renamed
  * the feature.
@@ -64,6 +65,8 @@ private[scheduler] class HealthTracker (
   val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = 
HealthTracker.getExludeOnFailureTimeout(conf)
   private val EXCLUDE_FETCH_FAILURE_ENABLED =
     conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
+  private val EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED =
+    conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)
 
   /**
    * A map from executorId to information on task failures. Tracks the time of 
each task failure,
@@ -154,11 +157,21 @@ private[scheduler] class HealthTracker (
   }
 
   private def killExecutor(exec: String, msg: String): Unit = {
+    val fullMsg = if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
+      s"${msg} (actually decommissioning)"
+    } else {
+      msg
+    }
     allocationClient match {
       case Some(a) =>
-        logInfo(msg)
-        a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
-          force = true)
+        logInfo(fullMsg)
+        if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
+          a.decommissionExecutor(exec, ExecutorDecommissionInfo(fullMsg),
+            adjustTargetNumExecutors = false)
+        } else {
+          a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, 
countFailures = false,
+            force = true)
+        }
       case None =>
         logInfo(s"Not attempting to kill excluded executor id $exec " +
           s"since allocation client is not defined.")
@@ -182,10 +195,18 @@ private[scheduler] class HealthTracker (
     if (conf.get(config.EXCLUDE_ON_FAILURE_KILL_ENABLED)) {
       allocationClient match {
         case Some(a) =>
-          logInfo(s"Killing all executors on excluded host $node " +
-            s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-          if (a.killExecutorsOnHost(node) == false) {
-            logError(s"Killing executors on node $node failed.")
+          if (EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) {
+            logInfo(s"Decommissioning all executors on excluded host $node " +
+              s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
+            if (!a.decommissionExecutorsOnHost(node)) {
+              logError(s"Decommissioning executors on $node failed.")
+            }
+          } else {
+            logInfo(s"Killing all executors on excluded host $node " +
+              s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
+            if (!a.killExecutorsOnHost(node)) {
+              logError(s"Killing executors on node $node failed.")
+            }
           }
         case None =>
           logWarning(s"Not attempting to kill executors on excluded host $node 
" +
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 2f17143..a6f52f9 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -49,6 +49,9 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
+  case class DecommissionExecutorsOnHost(host: String)
+    extends CoarseGrainedClusterMessage
+
   case class UpdateDelegationTokens(tokens: Array[Byte])
     extends CoarseGrainedClusterMessage
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index ccb5eb1..b44f677 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import javax.annotation.concurrent.GuardedBy
 
@@ -115,6 +115,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   private val reviveThread =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
 
+  private val cleanupService: Option[ScheduledExecutorService] =
+    conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { _ =>
+      
ThreadUtils.newDaemonSingleThreadScheduledExecutor("cleanup-decommission-execs")
+    }
+
   class DriverEndpoint extends IsolatedRpcEndpoint with Logging {
 
     override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
@@ -176,11 +181,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         }
 
       case KillExecutorsOnHost(host) =>
-        scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
-          killExecutors(exec.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
+        scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
+          killExecutors(execs.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
             force = true)
         }
 
+      case DecommissionExecutorsOnHost(host) =>
+        val reason = ExecutorDecommissionInfo(s"Decommissioning all executors 
on $host.")
+        scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
+          val execsWithReasons = execs.map(exec => (exec, reason)).toArray
+
+          decommissionExecutors(execsWithReasons, adjustTargetNumExecutors = 
false,
+            triggeredByExecutor = false)
+        }
+
       case UpdateDelegationTokens(newDelegationTokens) =>
         updateDelegationTokens(newDelegationTokens)
 
@@ -506,6 +520,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
 
+    conf.get(EXECUTOR_DECOMMISSION_FORCE_KILL_TIMEOUT).map { cleanupInterval =>
+      val cleanupTask = new Runnable() {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+          val stragglers = CoarseGrainedSchedulerBackend.this.synchronized {
+            
executorsToDecommission.filter(executorsPendingDecommission.contains)
+          }
+          if (stragglers.nonEmpty) {
+            logInfo(s"${stragglers.toList} failed to decommission in 
${cleanupInterval}, killing.")
+            killExecutors(stragglers, false, false, true)
+          }
+        }
+      }
+      cleanupService.map(_.schedule(cleanupTask, cleanupInterval, 
TimeUnit.SECONDS))
+    }
+
     executorsToDecommission
   }
 
@@ -548,6 +577,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   override def stop(): Unit = {
     reviveThread.shutdownNow()
+    cleanupService.foreach(_.shutdownNow())
     stopExecutors()
     delegationTokenManager.foreach(_.stop())
     try {
@@ -851,13 +881,29 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     Future.successful(false)
 
   /**
+   * Request that the cluster manager decommissions all executors on a given 
host.
+   * @return whether the decommission request is acknowledged.
+   */
+  final override def decommissionExecutorsOnHost(host: String): Boolean = {
+    logInfo(s"Requesting to kill any and all executors on host $host")
+    // A potential race exists if a new executor attempts to register on a host
+    // that is on the exclude list and is no longer valid. To avoid this race,
+    // all executor registration and decommissioning happens in the event 
loop. This way, either
+    // an executor will fail to register, or will be decommed when all 
executors on a host
+    // are decommed.
+    // Decommission all the executors on this host in an event loop to ensure 
serialization.
+    driverEndpoint.send(DecommissionExecutorsOnHost(host))
+    true
+  }
+
+  /**
    * Request that the cluster manager kill all executors on a given host.
    * @return whether the kill request is acknowledged.
    */
   final override def killExecutorsOnHost(host: String): Boolean = {
-    logInfo(s"Requesting to kill any and all executors on host ${host}")
+    logInfo(s"Requesting to kill any and all executors on host $host")
     // A potential race exists if a new executor attempts to register on a host
-    // that is on the exclude list and is no no longer valid. To avoid this 
race,
+    // that is on the exclude list and is no longer valid. To avoid this race,
     // all executor registration and killing happens in the event loop. This 
way, either
     // an executor will fail to register, or will be killed when all executors 
on a host
     // are killed.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
index 7ecc1f5..5710be1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
@@ -554,6 +554,51 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+
+    // verify we decommission when configured
+    conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+    conf.set(config.DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+    conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+    healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+    // Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+    // application.
+    val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+    (0 until 4).foreach { partition =>
+      taskSetExclude2.updateExcludedForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+    val msg1 =
+      "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set." +
+      " (actually decommissioning)"
+
+    verify(allocationClientMock).decommissionExecutor(
+      "1", ExecutorDecommissionInfo(msg1), false)
+
+    val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
+    // Fail 4 tasks in one task set on executor 2, so that executor gets 
excluded for the whole
+    // application.  Since that's the second executor that is excluded on the 
same node, we also
+    // exclude that node.
+    (0 until 4).foreach { partition =>
+      taskSetExclude3.updateExcludedForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude3.execToFailures)
+
+    val msg2 =
+      "Killing excluded executor id 2 since 
spark.excludeOnFailure.killExcludedExecutors is set." +
+      " (actually decommissioning)"
+    verify(allocationClientMock).decommissionExecutor(
+      "2", ExecutorDecommissionInfo(msg2), false, false)
+    verify(allocationClientMock).decommissionExecutorsOnHost("hostA")
+  }
+
   test("fetch failure excluding kills executors, configured by 
EXCLUDE_ON_FAILURE_KILL_ENABLED") {
     val allocationClientMock = mock[ExecutorAllocationClient]
     when(allocationClientMock.killExecutors(any(), any(), any(), 
any())).thenReturn(Seq("called"))
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 92f6a32..56a23ab 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -116,6 +116,38 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
       executorPatience = None,
       decommissioningTest = false)
   }
+
+  test("Test decommissioning timeouts", k8sTestTag) {
+    sparkAppConf
+      .set(config.DECOMMISSION_ENABLED.key, "true")
+      .set("spark.kubernetes.container.image", pyImage)
+      .set(config.STORAGE_DECOMMISSION_ENABLED.key, "true")
+      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED.key, "true")
+      .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED.key, "true")
+      // Ensure we have somewhere to migrate our data too
+      .set("spark.executor.instances", "3")
+      // Set super high so the timeout is triggered
+      .set("spark.storage.decommission.replicationReattemptInterval", 
"8640000")
+      // Set super low so the timeout is triggered
+      .set(config.EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL.key, "10")
+
+    runSparkApplicationAndVerifyCompletion(
+      appResource = PYSPARK_DECOMISSIONING,
+      mainClass = "",
+      expectedDriverLogOnCompletion = Seq(
+        "Finished waiting, stopping Spark",
+        "Decommission executors",
+        "failed to decommission in 10, killing",
+        "killed by driver."),
+      appArgs = Array.empty[String],
+      driverPodChecker = doBasicDriverPyPodCheck,
+      executorPodChecker = doBasicExecutorPyPodCheck,
+      appLocator = appLocator,
+      isJVM = false,
+      pyFiles = None,
+      executorPatience = None,
+      decommissioningTest = true)
+  }
 }
 
 private[spark] object DecommissionSuite {
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 494c825..9f1bcf7 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -345,8 +345,11 @@ class KubernetesSuite extends SparkFunSuite
                 }
                 // Delete the pod to simulate cluster scale down/migration.
                 // This will allow the pod to remain up for the grace period
+                // We set an intentionally long grace period to test that Spark
+                // exits once the blocks are done migrating and doesn't wait 
for the
+                // entire grace period if it does not need to.
                 kubernetesTestComponents.kubernetesClient.pods()
-                  .withName(name).delete()
+                  .withName(name).withGracePeriod(Int.MaxValue).delete()
                 logDebug(s"Triggered pod decom/delete: $name deleted")
                 // Make sure this pod is deleted
                 Eventually.eventually(TIMEOUT, INTERVAL) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to