Ngone51 commented on a change in pull request #31249:
URL: https://github.com/apache/spark/pull/31249#discussion_r568339020



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1942,6 +1950,15 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.cleanupInterval")

Review comment:
       Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` 
sounds more like a periodical task while it actually happens once.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1942,6 +1950,15 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.cleanupInterval")

Review comment:
       Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` 
sounds more like is used for a periodical task while the kill action actually 
happens once here.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala
##########
@@ -64,6 +65,7 @@ private[scheduler] class HealthTracker (
   val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = 
HealthTracker.getExludeOnFailureTimeout(conf)
   private val EXCLUDE_FETCH_FAILURE_ENABLED =
     conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED)
+  private val decommission = 
conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED)

Review comment:
       Shall we keep the same pattern of naming here? e.g., 
`EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED`. Usually, it's good for developers to 
understand the code here.

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##########
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+
+    // verify we decommission when configured
+    conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+    conf.set(config.DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+    conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+    healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+    // Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+    // application.
+    val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+    (0 until 4).foreach { partition =>
+      taskSetExclude2.updateExcludedForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+    val msg1 =
+      "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."

Review comment:
       Shall we include decommission hint when enabled in the message (in 
`killExecutor()`)? e.g., "Killing (decommission) excluded executor..."

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala
##########
@@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with Mock
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
+  test("excluding decommission and kills executors when enabled") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+
+    // verify we decommission when configured
+    conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true)
+    conf.set(config.DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true")
+    conf.set(config.MAX_FAILURES_PER_EXEC.key, "1")
+    conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2")
+    healthTracker = new HealthTracker(listenerBusMock, conf, 
Some(allocationClientMock), clock)
+
+    // Fail 4 tasks in one task set on executor 1, so that executor gets 
excluded for the whole
+    // application.
+    val taskSetExclude2 = createTaskSetExcludelist(stageId = 0)
+    (0 until 4).foreach { partition =>
+      taskSetExclude2.updateExcludedForFailedTask(
+        "hostA", exec = "1", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude2.execToFailures)
+
+    val msg1 =
+      "Killing excluded executor id 1 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+
+    verify(allocationClientMock).decommissionExecutor(
+      "1", ExecutorDecommissionInfo(msg1), false)
+
+    val taskSetExclude3 = createTaskSetExcludelist(stageId = 1)
+    // Fail 4 tasks in one task set on executor 2, so that executor gets 
excluded for the whole
+    // application.  Since that's the second executor that is excluded on the 
same node, we also
+    // exclude that node.
+    (0 until 4).foreach { partition =>
+      taskSetExclude3.updateExcludedForFailedTask(
+        "hostA", exec = "2", index = partition, failureReason = "testing")
+    }
+    healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, 
taskSetExclude3.execToFailures)
+
+    val msg2 =
+      "Killing excluded executor id 2 since 
spark.excludeOnFailure.killExcludedExecutors is set."
+    verify(allocationClientMock).decommissionExecutor(
+      "2", ExecutorDecommissionInfo(msg2), false, false)
+    verify(allocationClientMock).decommissionExecutorsOnHost(
+      "hostA")

Review comment:
       nit: turn into one line?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to