agrawaldevesh commented on a change in pull request #29367: URL: https://github.com/apache/spark/pull/29367#discussion_r468882865
########## File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ########## @@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + adjustExecutors(executorsToDecommission.map(_._1)) + } + + val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + decommissioned + } + + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Asking executor $executorId to decommissioning.") + try { + scheduler.executorDecommission(executorId, decomInfo) + if (driverEndpoint != null) { + logInfo("Propagating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) Review comment: I believe there is still a cycle here: Please trace through how `DecommissionExecutor` message is handled: It will eventually call this doDecommission which will send the message again ... If I understand correctly, this may end up live-locking the driver until the poor executor actually dies for good. One way to break this cycle is to directly call `doDecommission` in the handlers for `DecommissionExecutor` in this class's `receive` and `receiveAndReply` methods, with a special flag that forbids re-sending the message. [blocker] ########## File path: core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala ########## @@ -242,8 +242,10 @@ class DecommissionWorkerSuite assert(jobResult === 2) } // 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage attempt 1 and 2. - val tasksSeen = listener.getTasksFinished() Review comment: Would you happen to recall the github actions error you got that lead to this change ? I would like to dig further because I invoke the listener using TestUtils.withListener(sc, listener): Which waits for the listener to drain and also removes the listener. So I don't think wrapping this in an eventually should actually be doing anything: The listener has already been removed. Perhaps I ought to bring back the the "waiting for job done" inside of the getTasksFinished or as a separate call. I would like to understand further just so that I can learn about some of the gotchas with this listener stuff. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org