holdenk commented on a change in pull request #29367: URL: https://github.com/apache/spark/pull/29367#discussion_r468763396
########## File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ########## @@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + executorsToDecommission.foreach { case (exec, _) => + val rpId = withLock { + executorDataMap(exec).resourceProfileId + } + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } + + val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + decommissioned + } + + + private def doDecommission(executorId: String, Review comment: Ok I cleaned up the duplicated function, I'll poke at the possible double driver endpoint send this morning. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org