Github user brad-kaiser commented on a diff in the pull request: https://github.com/apache/spark/pull/19041#discussion_r155031960 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager( * Request the cluster manager to remove the given executors. * Returns the list of executors which are removed. */ - private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { - val executorIdsToBeRemoved = new ArrayBuffer[String] - + private def removeExecutors(executors: Seq[String]): Unit = synchronized { logInfo("Request to remove executorIds: " + executors.mkString(", ")) - val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size - - var newExecutorTotal = numExistingExecutors - executors.foreach { executorIdToBeRemoved => - if (newExecutorTotal - 1 < minNumExecutors) { - logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") - } else if (newExecutorTotal - 1 < numExecutorsTarget) { - logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + - s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") - } else if (canBeKilled(executorIdToBeRemoved)) { - executorIdsToBeRemoved += executorIdToBeRemoved - newExecutorTotal -= 1 - } - } + val numExistingExecs = allocationManager.executorIds.size - executorsPendingToRemove.size + val execCountFloor = math.max(minNumExecutors, numExecutorsTarget) + val (executorIdsToBeRemoved, dontRemove) = executors + .filter(canBeKilled) + .splitAt(numExistingExecs - execCountFloor) - if (executorIdsToBeRemoved.isEmpty) { - return Seq.empty[String] + dontRemove.foreach { execId => + logDebug(s"Not removing idle executor $execId because it " + + s"would put us below the minimum limit of $minNumExecutors executors" + + s"or number of target executors $numExecutorsTarget") } - // Send a request to the backend to kill this executor(s) - val executorsRemoved = if (testing) { - executorIdsToBeRemoved + if (executorIdsToBeRemoved.isEmpty) { + Seq.empty[String] + } else if (testing) { + recordExecutorKill(executorIdsToBeRemoved) + } else if (recoverCachedData) { + logDebug(s"Starting replicate process for $executorIdsToBeRemoved") + client.markPendingToRemove(executorIdsToBeRemoved) + recordExecutorKill(executorIdsToBeRemoved) + cacheRecoveryManager.startExecutorKill(executorIdsToBeRemoved) } else { - client.killExecutors(executorIdsToBeRemoved) + val killed = killExecutors(executorIdsToBeRemoved) + recordExecutorKill(killed) } - // [SPARK-21834] killExecutors api reduces the target number of executors. - // So we need to update the target with desired value. - client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) - // reset the newExecutorTotal to the existing number of executors - newExecutorTotal = numExistingExecutors - if (testing || executorsRemoved.nonEmpty) { - executorsRemoved.foreach { removedExecutorId => - newExecutorTotal -= 1 - logInfo(s"Removing executor $removedExecutorId because it has been idle for " + - s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)") - executorsPendingToRemove.add(removedExecutorId) - } - executorsRemoved - } else { + } + + def killExecutors(executorIds: Seq[String], forceIfPending: Boolean = false): Seq[String] = { + logDebug(s"Starting kill process for $executorIds") + val result = client.killExecutors(executorIds, forceIfPending = forceIfPending) + if (result.isEmpty) { logWarning(s"Unable to reach the cluster manager to kill executor/s " + - s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") - Seq.empty[String] + s"${executorIds.mkString(",")} or no executor eligible to kill!") } + result } - /** - * Request the cluster manager to remove the given executor. - * Return whether the request is acknowledged. - */ - private def removeExecutor(executorId: String): Boolean = synchronized { - val executorsRemoved = removeExecutors(Seq(executorId)) - executorsRemoved.nonEmpty && executorsRemoved(0) == executorId + private def recordExecutorKill(executorsRemoved: Seq[String]): Unit = synchronized { + // [SPARK-21834] killExecutors api reduces the target number of executors. + // So we need to update the target with desired value. + client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) + executorsPendingToRemove ++= executorsRemoved + logInfo(s"Removing executor $executorsRemoved because it has been idle for " + --- End diff -- fixed
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org