[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-02-02 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568339020



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1942,6 +1950,15 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL =
+ConfigBuilder("spark.executor.decommission.cleanupInterval")

Review comment:
   Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` 
sounds more like a periodical task while it actually happens once.

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1942,6 +1950,15 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL =
+ConfigBuilder("spark.executor.decommission.cleanupInterval")

Review comment:
   Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` 
sounds more like is used for a periodical task while the kill action actually 
happens once here.

##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -64,6 +65,7 @@ private[scheduler] class HealthTracker (
   val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = 
HealthTracker.getExludeOnFailureTimeout(conf)
   private val EXCLUDE_FETCH_FAILURE_ENABLED =
 conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
+  private val decommission = 
conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)

Review comment:
   Shall we keep the same pattern of naming here? e.g., 
`EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED`. Usually, it's good for developers to 
understand the code here.

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
 verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+val allocationClientMock = mock[ExecutorAllocationClient]
+
+// verify we decommission when configured
+conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+conf.set(config.DECOMMISSION_ENABLED.key, "true")
+conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+// Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+// application.
+val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+(0 until 4).foreach { partition =>
+  taskSetExclude2.updateExcludedForFailedTask(
+"hostA", exec = "1", index = partition, failureReason = "testing")
+}
+healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+val msg1 =
+  "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."

Review comment:
   Shall we include decommission hint when enabled in the message (in 
`killExecutor()`)? e.g., "Killing (decommission) excluded executor..."

##
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
 verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+val allocationClientMock = mock[ExecutorAllocationClient]
+
+// verify we decommission when configured
+conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+conf.set(config.DECOMMISSION_ENABLED.key, "true")
+conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+// Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+// application.
+val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+(0 until 4).foreach { partition =>
+  taskSetExclude2.updateExcludedForFailedTask(
+"hostA", exec = "1", index = partition, failureReason = "testing")
+}
+healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+val msg1 =
+  "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+
+verify(allocationClientMock).decommissionExecutor(
+  "1", ExecutorDecommissionInfo(msg1), false)
+
+val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
+// Fail 4 t

[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-02-01 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568358423



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
 verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+val allocationClientMock = mock[ExecutorAllocationClient]
+
+// verify we decommission when configured
+conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+conf.set(config.DECOMMISSION_ENABLED.key, "true")
+conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+// Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+// application.
+val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+(0 until 4).foreach { partition =>
+  taskSetExclude2.updateExcludedForFailedTask(
+"hostA", exec = "1", index = partition, failureReason = "testing")
+}
+healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+val msg1 =
+  "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+
+verify(allocationClientMock).decommissionExecutor(
+  "1", ExecutorDecommissionInfo(msg1), false)
+
+val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
+// Fail 4 tasks in one task set on executor 2, so that executor gets 
excluded for the whole
+// application.  Since that's the second executor that is excluded on the 
same node, we also
+// exclude that node.
+(0 until 4).foreach { partition =>
+  taskSetExclude3.updateExcludedForFailedTask(
+"hostA", exec = "2", index = partition, failureReason = "testing")
+}
+healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude3.execToFailures)
+
+val msg2 =
+  "Killing excluded executor id 2 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+verify(allocationClientMock).decommissionExecutor(
+  "2", ExecutorDecommissionInfo(msg2), false, false)
+verify(allocationClientMock).decommissionExecutorsOnHost(
+  "hostA")

Review comment:
   nit: turn into one line?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-02-01 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568356238



##
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
 verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+val allocationClientMock = mock[ExecutorAllocationClient]
+
+// verify we decommission when configured
+conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+conf.set(config.DECOMMISSION_ENABLED.key, "true")
+conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+// Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+// application.
+val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+(0 until 4).foreach { partition =>
+  taskSetExclude2.updateExcludedForFailedTask(
+"hostA", exec = "1", index = partition, failureReason = "testing")
+}
+healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+val msg1 =
+  "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."

Review comment:
   Shall we include decommission hint when enabled in the message (in 
`killExecutor()`)? e.g., "Killing (decommission) excluded executor..."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-02-01 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568340541



##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -64,6 +65,7 @@ private[scheduler] class HealthTracker (
   val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = 
HealthTracker.getExludeOnFailureTimeout(conf)
   private val EXCLUDE_FETCH_FAILURE_ENABLED =
 conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
+  private val decommission = 
conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)

Review comment:
   Shall we keep the same pattern of naming here? e.g., 
`EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED`. Usually, it's good for developers to 
understand the code here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-02-01 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568339020



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1942,6 +1950,15 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL =
+ConfigBuilder("spark.executor.decommission.cleanupInterval")

Review comment:
   Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` 
sounds more like is used for a periodical task while the kill action actually 
happens once here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-02-01 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568339020



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1942,6 +1950,15 @@ package object config {
   .timeConf(TimeUnit.SECONDS)
   .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL =
+ConfigBuilder("spark.executor.decommission.cleanupInterval")

Review comment:
   Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` 
sounds more like a periodical task while it actually happens once.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-01-25 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r563517094



##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -184,8 +191,14 @@ private[scheduler] class HealthTracker (
 case Some(a) =>
   logInfo(s"Killing all executors on excluded host $node " +
 s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-  if (a.killExecutorsOnHost(node) == false) {
-logError(s"Killing executors on node $node failed.")
+  if (decommission) {
+if (a.decommissionExecutorsOnHost(node) == false) {
+  logError(s"Decommissioning executors on $node failed.")
+}
+  } else {
+if (a.killExecutorsOnHost(node) == false) {

Review comment:
   ditto

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -176,11 +181,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 }
 
   case KillExecutorsOnHost(host) =>
-scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
-  killExecutors(exec.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
+scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
+  killExecutors(execs.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
 force = true)
 }
 
+  case DecommissionExecutorsOnHost(host) =>
+val reason = ExecutorDecommissionInfo(s"Decommissioning all executors 
on $host.")
+scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
+  val execsWithReasons = execs.map { exec =>
+(exec, reason)
+  }.toArray

Review comment:
   nit:
   ```suggestion
 val execsWithReasons = execs.map(exec => (exec, reason)).toArray
   
   ```

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -506,6 +521,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
+conf.get(EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL).map { cleanupInterval =>
+  val cleanupTask = new Runnable() {
+override def run(): Unit = Utils.tryLogNonFatalError {
+  val stragglers = 
executorsToDecommission.filter(executorsPendingDecommission.contains(_))

Review comment:
   nit:
   ```suggestion
 val stragglers = 
executorsToDecommission.filter(executorsPendingDecommission.contains)
   ```

##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -184,8 +191,14 @@ private[scheduler] class HealthTracker (
 case Some(a) =>
   logInfo(s"Killing all executors on excluded host $node " +
 s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-  if (a.killExecutorsOnHost(node) == false) {
-logError(s"Killing executors on node $node failed.")
+  if (decommission) {
+if (a.decommissionExecutorsOnHost(node) == false) {

Review comment:
   nit: 
   ```suggestion
   if (!a.decommissionExecutorsOnHost(node)) {
   ```

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -506,6 +521,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
+conf.get(EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL).map { cleanupInterval =>
+  val cleanupTask = new Runnable() {
+override def run(): Unit = Utils.tryLogNonFatalError {
+  val stragglers = 
executorsToDecommission.filter(executorsPendingDecommission.contains(_))
+  if (stragglers.nonEmpty) {
+logInfo(
+  s"${stragglers.toList} failed to decommission in 
${cleanupInterval}, killing.")

Review comment:
   nit: unnecessary brackets for `cleanupInterval`.

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -839,7 +869,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 Future.successful (if (killSuccessful) executorsToKill else 
Seq.empty[String])
   )(ThreadUtils.sameThread)
 }
-

Review comment:
   Could you revert this unrelated change?

##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -40,6 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
  *  stage, but still many failures over the entire application
  *  * "flaky" executors -- they don't fail every task, but are still faulty 
enough to merit
  *  excluding
+ *  * missing shuffle files -- may trigger fetch failures on health executors.

Review comment:
   nit: "healthy"?

##
F

[GitHub] [spark] Ngone51 commented on a change in pull request #31249: [SPARK-34104][SPARK-34105][CORE][K8S] Maximum decommissioning time & allow decommissioning for excludes

2021-01-25 Thread GitBox


Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r563517094



##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -184,8 +191,14 @@ private[scheduler] class HealthTracker (
 case Some(a) =>
   logInfo(s"Killing all executors on excluded host $node " +
 s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-  if (a.killExecutorsOnHost(node) == false) {
-logError(s"Killing executors on node $node failed.")
+  if (decommission) {
+if (a.decommissionExecutorsOnHost(node) == false) {
+  logError(s"Decommissioning executors on $node failed.")
+}
+  } else {
+if (a.killExecutorsOnHost(node) == false) {

Review comment:
   ditto

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -176,11 +181,21 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 }
 
   case KillExecutorsOnHost(host) =>
-scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
-  killExecutors(exec.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
+scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
+  killExecutors(execs.toSeq, adjustTargetNumExecutors = false, 
countFailures = false,
 force = true)
 }
 
+  case DecommissionExecutorsOnHost(host) =>
+val reason = ExecutorDecommissionInfo(s"Decommissioning all executors 
on $host.")
+scheduler.getExecutorsAliveOnHost(host).foreach { execs =>
+  val execsWithReasons = execs.map { exec =>
+(exec, reason)
+  }.toArray

Review comment:
   nit:
   ```suggestion
 val execsWithReasons = execs.map(exec => (exec, reason)).toArray
   
   ```

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -506,6 +521,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
+conf.get(EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL).map { cleanupInterval =>
+  val cleanupTask = new Runnable() {
+override def run(): Unit = Utils.tryLogNonFatalError {
+  val stragglers = 
executorsToDecommission.filter(executorsPendingDecommission.contains(_))

Review comment:
   nit:
   ```suggestion
 val stragglers = 
executorsToDecommission.filter(executorsPendingDecommission.contains)
   ```

##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -184,8 +191,14 @@ private[scheduler] class HealthTracker (
 case Some(a) =>
   logInfo(s"Killing all executors on excluded host $node " +
 s"since ${config.EXCLUDE_ON_FAILURE_KILL_ENABLED.key} is set.")
-  if (a.killExecutorsOnHost(node) == false) {
-logError(s"Killing executors on node $node failed.")
+  if (decommission) {
+if (a.decommissionExecutorsOnHost(node) == false) {

Review comment:
   nit: 
   ```suggestion
   if (!a.decommissionExecutorsOnHost(node)) {
   ```

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -506,6 +521,20 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 }
 
+conf.get(EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL).map { cleanupInterval =>
+  val cleanupTask = new Runnable() {
+override def run(): Unit = Utils.tryLogNonFatalError {
+  val stragglers = 
executorsToDecommission.filter(executorsPendingDecommission.contains(_))
+  if (stragglers.nonEmpty) {
+logInfo(
+  s"${stragglers.toList} failed to decommission in 
${cleanupInterval}, killing.")

Review comment:
   nit: unnecessary brackets for `cleanupInterval`.

##
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##
@@ -839,7 +869,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 Future.successful (if (killSuccessful) executorsToKill else 
Seq.empty[String])
   )(ThreadUtils.sameThread)
 }
-

Review comment:
   Could you revert this unrelated change?

##
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##
@@ -40,6 +40,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
  *  stage, but still many failures over the entire application
  *  * "flaky" executors -- they don't fail every task, but are still faulty 
enough to merit
  *  excluding
+ *  * missing shuffle files -- may trigger fetch failures on health executors.

Review comment:
   nit: "healthy"?

##
F