holdenk commented on a change in pull request #29367: URL: https://github.com/apache/spark/pull/29367#discussion_r468821063
########## File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ########## @@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + executorsToDecommission.foreach { case (exec, _) => + val rpId = withLock { + executorDataMap(exec).resourceProfileId + } + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } + + val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + decommissioned + } + + + private def doDecommission(executorId: String, Review comment: Looking inside of the EAM I see the call to decom inside of removeExecutors, but I'm not seeing the message. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org