Ngone51 commented on a change in pull request #31249: URL: https://github.com/apache/spark/pull/31249#discussion_r568339020
########## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ########## @@ -1942,6 +1950,15 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL = + ConfigBuilder("spark.executor.decommission.cleanupInterval") Review comment: Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` sounds more like a periodical task while it actually happens once. ########## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ########## @@ -1942,6 +1950,15 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val EXECUTOR_DECOMMISSION_CLEANUP_INTERVAL = + ConfigBuilder("spark.executor.decommission.cleanupInterval") Review comment: Shall we rename it to `*.decommission.forceKillTimeout`? `Interval` sounds more like is used for a periodical task while the kill action actually happens once here. ########## File path: core/src/main/scala/org/apache/spark/scheduler/HealthTracker.scala ########## @@ -64,6 +65,7 @@ private[scheduler] class HealthTracker ( val EXCLUDE_ON_FAILURE_TIMEOUT_MILLIS = HealthTracker.getExludeOnFailureTimeout(conf) private val EXCLUDE_FETCH_FAILURE_ENABLED = conf.get(config.EXCLUDE_ON_FAILURE_FETCH_FAILURE_ENABLED) + private val decommission = conf.get(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED) Review comment: Shall we keep the same pattern of naming here? e.g., `EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED`. Usually, it's good for developers to understand the code here. ########## File path: core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala ########## @@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with Mock verify(allocationClientMock).killExecutorsOnHost("hostA") } + test("excluding decommission and kills executors when enabled") { + val allocationClientMock = mock[ExecutorAllocationClient] + + // verify we decommission when configured + conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true) + conf.set(config.DECOMMISSION_ENABLED.key, "true") + conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true") + conf.set(config.MAX_FAILURES_PER_EXEC.key, "1") + conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2") + healthTracker = new HealthTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + // Fail 4 tasks in one task set on executor 1, so that executor gets excluded for the whole + // application. + val taskSetExclude2 = createTaskSetExcludelist(stageId = 0) + (0 until 4).foreach { partition => + taskSetExclude2.updateExcludedForFailedTask( + "hostA", exec = "1", index = partition, failureReason = "testing") + } + healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude2.execToFailures) + + val msg1 = + "Killing excluded executor id 1 since spark.excludeOnFailure.killExcludedExecutors is set." Review comment: Shall we include decommission hint when enabled in the message (in `killExecutor()`)? e.g., "Killing (decommission) excluded executor..." ########## File path: core/src/test/scala/org/apache/spark/scheduler/HealthTrackerSuite.scala ########## @@ -554,6 +554,50 @@ class HealthTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with Mock verify(allocationClientMock).killExecutorsOnHost("hostA") } + test("excluding decommission and kills executors when enabled") { + val allocationClientMock = mock[ExecutorAllocationClient] + + // verify we decommission when configured + conf.set(config.EXCLUDE_ON_FAILURE_KILL_ENABLED, true) + conf.set(config.DECOMMISSION_ENABLED.key, "true") + conf.set(config.EXCLUDE_ON_FAILURE_DECOMMISSION_ENABLED.key, "true") + conf.set(config.MAX_FAILURES_PER_EXEC.key, "1") + conf.set(config.MAX_FAILED_EXEC_PER_NODE.key, "2") + healthTracker = new HealthTracker(listenerBusMock, conf, Some(allocationClientMock), clock) + + // Fail 4 tasks in one task set on executor 1, so that executor gets excluded for the whole + // application. + val taskSetExclude2 = createTaskSetExcludelist(stageId = 0) + (0 until 4).foreach { partition => + taskSetExclude2.updateExcludedForFailedTask( + "hostA", exec = "1", index = partition, failureReason = "testing") + } + healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude2.execToFailures) + + val msg1 = + "Killing excluded executor id 1 since spark.excludeOnFailure.killExcludedExecutors is set." + + verify(allocationClientMock).decommissionExecutor( + "1", ExecutorDecommissionInfo(msg1), false) + + val taskSetExclude3 = createTaskSetExcludelist(stageId = 1) + // Fail 4 tasks in one task set on executor 2, so that executor gets excluded for the whole + // application. Since that's the second executor that is excluded on the same node, we also + // exclude that node. + (0 until 4).foreach { partition => + taskSetExclude3.updateExcludedForFailedTask( + "hostA", exec = "2", index = partition, failureReason = "testing") + } + healthTracker.updateExcludedForSuccessfulTaskSet(0, 0, taskSetExclude3.execToFailures) + + val msg2 = + "Killing excluded executor id 2 since spark.excludeOnFailure.killExcludedExecutors is set." + verify(allocationClientMock).decommissionExecutor( + "2", ExecutorDecommissionInfo(msg2), false, false) + verify(allocationClientMock).decommissionExecutorsOnHost( + "hostA") Review comment: nit: turn into one line? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org