This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 548ac7c [SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling 548ac7c is described below commit 548ac7c4af2270a6bdbf7a6f29f4846eecdc0171 Author: Holden Karau <hka...@apple.com> AuthorDate: Wed Aug 12 17:07:18 2020 -0700 [SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling ### What changes were proposed in this pull request? If graceful decommissioning is enabled, Spark's dynamic scaling uses this instead of directly killing executors. ### Why are the changes needed? When scaling down Spark we should avoid triggering recomputes as much as possible. ### Does this PR introduce _any_ user-facing change? Hopefully their jobs run faster or at the same speed. It also enables experimental shuffle service free dynamic scaling when graceful decommissioning is enabled (using the same code as the shuffle tracking dynamic scaling). ### How was this patch tested? For now I've extended the ExecutorAllocationManagerSuite for both core & streaming. Closes #29367 from holdenk/SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling. Lead-authored-by: Holden Karau <hka...@apple.com> Co-authored-by: Holden Karau <hol...@pigscanfly.ca> Signed-off-by: Holden Karau <hka...@apple.com> --- .../apache/spark/ExecutorAllocationClient.scala | 40 +++++ .../apache/spark/ExecutorAllocationManager.scala | 30 +++- .../cluster/CoarseGrainedSchedulerBackend.scala | 190 ++++++++++++--------- .../cluster/StandaloneSchedulerBackend.scala | 3 +- .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 61 ++++++- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../apache/spark/storage/BlockManagerMaster.scala | 6 +- .../spark/ExecutorAllocationManagerSuite.scala | 71 +++++++- .../WorkerDecommissionExtendedSuite.scala | 3 +- .../spark/scheduler/WorkerDecommissionSuite.scala | 4 +- .../BlockManagerDecommissionIntegrationSuite.scala | 7 +- project/SparkBuild.scala | 2 + .../docker/src/main/dockerfiles/spark/decom.sh | 2 +- .../k8s/integrationtest/KubernetesSuite.scala | 27 ++- .../integration-tests/tests/decommissioning.py | 5 - .../scheduler/ExecutorAllocationManager.scala | 10 +- .../scheduler/ExecutorAllocationManagerSuite.scala | 51 ++++-- 17 files changed, 380 insertions(+), 134 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 00bd006..079340a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.scheduler.ExecutorDecommissionInfo + /** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. @@ -82,6 +84,44 @@ private[spark] trait ExecutorAllocationClient { force: Boolean = false): Seq[String] /** + * Request that the cluster manager decommission the specified executors. + * Default implementation delegates to kill, scheduler must override + * if it supports graceful decommissioning. + * + * @param executorsAndDecominfo identifiers of executors & decom info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + def decommissionExecutors( + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + killExecutors(executorsAndDecomInfo.map(_._1), + adjustTargetNumExecutors, + countFailures = false) + } + + + /** + * Request that the cluster manager decommission the specified executor. + * Delegates to decommissionExecutors. + * + * @param executorId identifiers of executor to decommission + * @param decommissionInfo information about the decommission (reason, host loss) + * @param adjustTargetNumExecutors if we should adjust the target number of executors. + * @return whether the request is acknowledged by the cluster manager. + */ + final def decommissionExecutor(executorId: String, + decommissionInfo: ExecutorDecommissionInfo, + adjustTargetNumExecutors: Boolean): Boolean = { + val decommissionedExecutors = decommissionExecutors( + Array((executorId, decommissionInfo)), + adjustTargetNumExecutors = adjustTargetNumExecutors) + decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) + } + + + /** * Request that the cluster manager kill every executor on the specified host. * * @return whether the request is acknowledged by the cluster manager. diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1570f86..1cb840f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfileManager @@ -127,6 +128,8 @@ private[spark] class ExecutorAllocationManager( private val executorAllocationRatio = conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) + private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED) + private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id validateSettings() @@ -204,7 +207,12 @@ private[spark] class ExecutorAllocationManager( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { - if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) { + // If dynamic allocation shuffle tracking or worker decommissioning along with + // storage shuffle decommissioning is enabled we have *experimental* support for + // decommissioning without a shuffle service. + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || + (decommissionEnabled && + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + @@ -539,7 +547,9 @@ private[spark] class ExecutorAllocationManager( // get the running total as we remove or initialize it to the count - pendingRemoval val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId, (executorMonitor.executorCountWithResourceProfile(rpId) - - executorMonitor.pendingRemovalCountPerResourceProfileId(rpId))) + executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) - + executorMonitor.decommissioningPerResourceProfileId(rpId) + )) if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " + s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " + @@ -565,8 +575,14 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, - countFailures = false, force = false) + if (decommissionEnabled) { + val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( + id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray + client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) + } else { + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, + countFailures = false, force = false) + } } // [SPARK-21834] killExecutors api reduces the target number of executors. @@ -578,7 +594,11 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved.toSeq) + if (decommissionEnabled) { + executorMonitor.executorsDecommissioned(executorsRemoved) + } else { + executorMonitor.executorsKilled(executorsRemoved.toSeq) + } logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") executorsRemoved.toSeq } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 200f2d8..ca65731 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: $decommissionInfo") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") - decommissionExecutor(executorId, decommissionInfo) - context.reply(true) + context.reply(decommissionExecutor(executorId, decommissionInfo, + adjustTargetNumExecutors = false)) case RetrieveSparkAppConfig(resourceProfileId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) @@ -420,59 +420,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } /** - * Mark a given executor as decommissioned and stop making resource offers for it. - */ - private def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { - val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId - true - } else { - false - } - } - - if (shouldDisable) { - logInfo(s"Starting decommissioning executor $executorId.") - try { - scheduler.executorDecommission(executorId, decommissionInfo) - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - } - // Send decommission message to the executor, this may be a duplicate since the executor - // could have been the one to notify us. But it's also possible the notification came from - // elsewhere and the executor does not yet know. - executorDataMap.get(executorId) match { - case Some(executorInfo) => - executorInfo.executorEndpoint.send(DecommissionSelf) - case None => - // Ignoring the executor since it is not registered. - logWarning(s"Attempted to decommission unknown executor $executorId.") - } - logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } - } else { - logInfo(s"Skipping decommissioning of executor $executorId.") - } - shouldDisable - } - - /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. * @@ -503,6 +450,87 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + adjustExecutors(executorsToDecommission.map(_._1)) + } + + executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + } + + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Asking executor $executorId to decommissioning.") + try { + scheduler.executorDecommission(executorId, decomInfo) + if (driverEndpoint != null) { + logInfo("Propagating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) + } + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + return false + } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } + } + logInfo(s"Asked executor $executorId to decommission.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo(s"Asking block manager corresponding to executor $executorId to decommission.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + return false + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + + true + } + + override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() @@ -598,17 +626,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(RemoveWorker(workerId, host, message)) } - /** - * Called by subclasses when notified of a decommissioning executor. - */ - private[spark] def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) - } - } - def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { @@ -761,6 +778,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Future.successful(false) /** + * Adjust the number of executors being requested to no longer include the provided executors. + */ + private def adjustExecutors(executorIds: Seq[String]) = { + if (executorIds.nonEmpty) { + executorIds.foreach { exec => + withLock { + val rpId = executorDataMap(exec).resourceProfileId + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } else { + Future.successful(true) + } + } + + /** * Request that the cluster manager kill the specified executors. * * @param executorIds identifiers of executors to kill @@ -798,19 +840,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // take into account executors that are pending to be added or removed. val adjustTotalExecutors = if (adjustTargetNumExecutors) { - executorsToKill.foreach { exec => - val rpId = executorDataMap(exec).resourceProfileId - val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) - if (requestedTotalExecutorsPerResourceProfile.isEmpty) { - // Assume that we are killing an executor that was started by default and - // not through the request api - requestedTotalExecutorsPerResourceProfile(rp) = 0 - } else { - val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) - requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) - } - } - doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + adjustExecutors(executorsToKill) } else { Future.successful(true) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d921af6..3acb6f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend( override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1), decommissionInfo) + val execId = fullId.split("/")(1) + decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false) logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 4d71907..8dbdc84 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ -import org.apache.spark.storage.RDDBlockId +import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId} import org.apache.spark.util.Clock /** @@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } + .filter { case (_, exec) => + !exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning} .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor( /** * Mark the given executors as pending to be removed. Should only be called in the EAM thread. + * This covers both kills and decommissions. */ def executorsKilled(ids: Seq[String]): Unit = { ids.foreach { id => @@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor( nextTimeout.set(Long.MinValue) } + private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = { + ids.foreach { id => + val tracker = executors.get(id) + if (tracker != null) { + tracker.decommissioning = true + } + } + + // Recompute timed out executors in the next EAM callback, since this call invalidates + // the current list. + nextTimeout.set(Long.MinValue) + } + def executorCount: Int = executors.size() def executorCountWithResourceProfile(id: Int): Int = { @@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size } + def decommissioningCount: Int = executors.asScala.count { case (_, exec) => + exec.decommissioning + } + + def decommissioningPerResourceProfileId(id: Int): Int = { + executors.asScala.filter { case (k, v) => + v.resourceProfileId == id && v.decommissioning + }.size + } + override def onJobStart(event: SparkListenerJobStart): Unit = { if (!shuffleTrackingEnabled) { return @@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor( // // This means that an executor may be marked as having shuffle data, and thus prevented // from being removed, even though the data may not be used. + // TODO: Only track used files (SPARK-31974) if (shuffleTrackingEnabled && event.reason == Success) { stageToShuffleID.get(event.stageId).foreach { shuffleId => exec.addShuffle(shuffleId) @@ -326,18 +352,35 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } } } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, + UNKNOWN_RESOURCE_PROFILE_ID) + + // Check if it is a shuffle file, or RDD to pick the correct codepath for update if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && + shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can + * be used to decide which executor(s) Spark should shutdown first. Since we move shuffle + * blocks around now this wires it up so that it keeps track of it. We only do this for + * data blocks as index and other blocks blocks do not necessarily mean the entire block + * has been committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } + } return } - val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, - UNKNOWN_RESOURCE_PROFILE_ID) + val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] @@ -410,10 +453,15 @@ private[spark] class ExecutorMonitor( } // Visible for testing - def executorsPendingToRemove(): Set[String] = { + private[spark] def executorsPendingToRemove(): Set[String] = { executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet } + // Visible for testing + private[spark] def executorsDecommissioning(): Set[String] = { + executors.asScala.filter { case (_, exec) => exec.decommissioning }.keys.toSet + } + /** * This method should be used when updating executor state. It guards against a race condition in * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` @@ -466,6 +514,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var decommissioning: Boolean = false var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6ec93df..ee534f5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,7 +1822,7 @@ private[spark] class BlockManager( } } - /* + /** * Returns the last migration time and a boolean denoting if all the blocks have been migrated. * If there are any tasks running since that time the boolean may be incorrect. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 93492cc..f544d47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,9 +43,11 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Decommission block managers corresponding to given set of executors */ + /** Decommission block managers corresponding to given set of executors + * Non-blocking. + */ def decommissionBlockManagers(executorIds: Seq[String]): Unit = { - driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds)) } /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5b367d2..3abe051 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining) } + test("mock polling loop remove with decommissioning") { + val clock = new ManualClock(2020L) + val manager = createManager(createConf(1, 20, 1, true), clock = clock) + + // Remove idle executors on timeout + onExecutorAddedDefaultProfile(manager, "executor-1") + onExecutorAddedDefaultProfile(manager, "executor-2") + onExecutorAddedDefaultProfile(manager, "executor-3") + assert(executorsDecommissioning(manager).isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + + // idle threshold not reached yet + clock.advance(executorIdleTimeout * 1000 / 2) + schedule(manager) + assert(manager.executorMonitor.timedOutExecutors().isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + assert(executorsDecommissioning(manager).isEmpty) + + // idle threshold exceeded + clock.advance(executorIdleTimeout * 1000) + assert(manager.executorMonitor.timedOutExecutors().size === 3) + schedule(manager) + assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 executor remaining) + assert(executorsDecommissioning(manager).size === 2) // limit reached (1 executor remaining) + + // Mark a subset as busy - only idle executors should be removed + onExecutorAddedDefaultProfile(manager, "executor-4") + onExecutorAddedDefaultProfile(manager, "executor-5") + onExecutorAddedDefaultProfile(manager, "executor-6") + onExecutorAddedDefaultProfile(manager, "executor-7") + assert(manager.executorMonitor.executorCount === 7) + assert(executorsPendingToRemove(manager).isEmpty) // no pending to be removed + assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning + onExecutorBusy(manager, "executor-4") + onExecutorBusy(manager, "executor-5") + onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones) + + // after scheduling, the previously timed out executor should be removed, since + // there are new active ones. + schedule(manager) + assert(executorsDecommissioning(manager).size === 3) + + // advance the clock so that idle executors should time out and move to the pending list + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsPendingToRemove(manager).size === 0) + assert(executorsDecommissioning(manager).size === 4) + assert(!executorsDecommissioning(manager).contains("executor-4")) + assert(!executorsDecommissioning(manager).contains("executor-5")) + assert(!executorsDecommissioning(manager).contains("executor-6")) + + // Busy executors are now idle and should be removed + onExecutorIdle(manager, "executor-4") + onExecutorIdle(manager, "executor-5") + onExecutorIdle(manager, "executor-6") + schedule(manager) + assert(executorsDecommissioning(manager).size === 4) + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsDecommissioning(manager).size === 6) // limit reached (1 executor remaining) + } + test("listeners trigger add executors correctly") { val manager = createManager(createConf(1, 20, 1)) assert(addTime(manager) === NOT_SET) @@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createConf( minExecutors: Int = 1, maxExecutors: Int = 5, - initialExecutors: Int = 1): SparkConf = { + initialExecutors: Int = 1, + decommissioningEnabled: Boolean = false): SparkConf = { val sparkConf = new SparkConf() .set(config.DYN_ALLOCATION_ENABLED, true) .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) @@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 30000L) + .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled) sparkConf } @@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def executorsPendingToRemove(manager: ExecutorAllocationManager): Set[String] = { manager.executorMonitor.executorsPendingToRemove() } + + private def executorsDecommissioning(manager: ExecutorAllocationManager): Set[String] = { + manager.executorMonitor.executorsDecommissioning() + } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index d95deb1..6bfd3f7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -65,7 +65,8 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false)) + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false), + adjustTargetNumExecutors = false) assert(rdd3.sortByKey().collect().length === 100) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index bb0c33a..ea5be21 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) + // Make the executors decommission, finish, exit, and not be replaced. + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray + sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 7cf0083..82f87a5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -188,9 +188,12 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execToDecommission = getCandidateExecutorToDecom.get logInfo(s"Decommissioning executor ${execToDecommission}") + + // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", isHostDecommissioned = false)) + ExecutorDecommissionInfo("", isHostDecommissioned = false), + adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() // Wait for job to finish. @@ -276,6 +279,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } // Wait for the executor to be removed automatically after migration. + // This is set to a high value since github actions is sometimes high latency + // but I've never seen this go for more than a minute. assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES)) // Since the RDD is cached or shuffled so further usage of same RDD should use the diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5a3ac21..110c311 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -838,6 +838,8 @@ object Unidoc { f.getCanonicalPath.contains("org/apache/spark/shuffle") && !f.getCanonicalPath.contains("org/apache/spark/shuffle/api"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/executor"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/ExecutorAllocationClient"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend"))) .map(_.filterNot(f => f.getCanonicalPath.contains("org/apache/spark/unsafe") && !f.getCanonicalPath.contains("org/apache/spark/unsafe/types/CalendarInterval"))) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 8a5208d..cd973df 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date -sleep 30 +sleep 1 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 279386d..28ab371 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -279,6 +279,7 @@ class KubernetesSuite extends SparkFunSuite appArgs = appArgs) val execPods = scala.collection.mutable.Map[String, Pod]() + val podsDeleted = scala.collection.mutable.HashSet[String]() val (patienceInterval, patienceTimeout) = { executorPatience match { case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) @@ -339,27 +340,21 @@ class KubernetesSuite extends SparkFunSuite } // Delete the pod to simulate cluster scale down/migration. // This will allow the pod to remain up for the grace period - val pod = kubernetesTestComponents.kubernetesClient.pods() - .withName(name) - pod.delete() + kubernetesTestComponents.kubernetesClient.pods() + .withName(name).delete() logDebug(s"Triggered pod decom/delete: $name deleted") - // Look for the string that indicates we should force kill the first - // Executor. This simulates the pod being fully lost. - logDebug("Waiting for second collect...") + // Make sure this pod is deleted Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPodName) - .getLog - .contains("Waiting some more, please kill exec 1."), - "Decommission test did not complete second collect.") + assert(podsDeleted.contains(name)) + } + // Then make sure this pod is replaced + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(execPods.size == 3) } - logDebug("Force deleting") - val podNoGrace = pod.withGracePeriod(0) - podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) + podsDeleted += name } } }) @@ -388,7 +383,6 @@ class KubernetesSuite extends SparkFunSuite Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } - execWatcher.close() execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => @@ -400,6 +394,7 @@ class KubernetesSuite extends SparkFunSuite s"The application did not complete, did not find str ${e}") } } + execWatcher.close() } protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index d34e616..5fcad08 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -47,11 +47,6 @@ if __name__ == "__main__": print("...") time.sleep(30) rdd.count() - print("Waiting some more, please kill exec 1.") - print("...") - time.sleep(30) - print("Executor node should be deleted now") - rdd.count() rdd.collect() print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 58bd56c..a4b7b7a2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -23,7 +23,9 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - client.killExecutor(execIdToRemove) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + client.decommissionExecutor(execIdToRemove, + ExecutorDecommissionInfo("spark scale down", false), + adjustTargetNumExecutors = true) + } else { + client.killExecutor(execIdToRemove) + } logInfo(s"Requested to kill executor $execIdToRemove") } else { logInfo(s"No non-receiver executors to kill") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 65efa10..9e06625 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -27,7 +27,9 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{ManualClock, Utils} @@ -44,11 +46,22 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } test("basic functionality") { + basicTest(decommissioning = false) + } + + test("basic decommissioning") { + basicTest(decommissioning = true) + } + + def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly + conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning) // There is 1 receiver, and exec 1 has been allocated to it - withAllocationManager(numReceivers = 1) { case (receiverTracker, allocationManager) => + withAllocationManager(numReceivers = 1, conf = conf) { + case (receiverTracker, allocationManager) => + when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1"))) /** Add data point for batch processing time and verify executor allocation */ @@ -83,53 +96,67 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase Map.empty)} } - /** Verify that a particular executor was killed */ - def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { - if (expectedKilledExec.nonEmpty) { - verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + /** Verify that a particular executor was scaled down. */ + def verifyScaledDownExec(expectedExec: Option[String]): Unit = { + if (expectedExec.nonEmpty) { + val decomInfo = ExecutorDecommissionInfo("spark scale down", false) + if (decommissioning) { + verify(allocationClient, times(1)).decommissionExecutor( + meq(expectedExec.get), meq(decomInfo), meq(true)) + verify(allocationClient, never).killExecutor(meq(expectedExec.get)) + } else { + verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get)) + verify(allocationClient, never).decommissionExecutor( + meq(expectedExec.get), meq(decomInfo), meq(true)) + } } else { - verify(allocationClient, never).killExecutor(null) + if (decommissioning) { + verify(allocationClient, never).decommissionExecutor(null, null, false) + verify(allocationClient, never).decommissionExecutor(null, null, true) + } else { + verify(allocationClient, never).killExecutor(null) + } } } // Batch proc time = batch interval, should increase allocation by 1 addBatchProcTimeAndVerifyAllocation(batchDurationMillis) { verifyTotalRequestedExecs(Some(3)) // one already allocated, increase allocation by 1 - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time = batch interval * 2, should increase allocation by 2 addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) { verifyTotalRequestedExecs(Some(4)) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale up ratio, should increase allocation by 1 addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(Some(3)) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly less than the scale up ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(Some("2")) + verifyScaledDownExec(Some("2")) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org