agrawaldevesh commented on a change in pull request #29367: URL: https://github.com/apache/spark/pull/29367#discussion_r467485361
########## File path: core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala ########## @@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.decommissioning) { Review comment: I am just trying to follow along this code, so pardon me if this is a n00b question: Why are we separately tracking pendingRemoval and decommissioning separately ? Two questions about that: - If an executor is marked as decommissioned here, when is it actually removed ? (Outside of dynamic allocation that happens when the executor naturally has a heartbeat failure. ). - Is my understanding correct that if graceful decommissioning is plugged into dynamic-allocation (this feature) AND the cluster manager supports decommissioning, then pendingRemoval would be empty -- ie executors would only be decommission ? ########## File path: core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala ########## @@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor( // // This means that an executor may be marked as having shuffle data, and thus prevented // from being removed, even though the data may not be used. + // TODO: Only track used files (SPARK-31974) Review comment: Is this comment change intended ? ########## File path: core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala ########## @@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } } } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { - if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { - return - } val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, UNKNOWN_RESOURCE_PROFILE_ID) + + // Check if it is a shuffle file, or RDD to pick the correct codepath for update + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can be + * used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks + * around now this wires it up so that it keeps track of it. We only do this for data blocks + * as index and other blocks blocks do not necessarily mean the entire block has been + * committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } + return + } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { Review comment: just a code nit to avoid two returns's: Would it make sense to put this shuffle block check inside the if-branch of not-instanceof-RDDblockId ? like: ``` if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) { .... } return } ``` ########## File path: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ########## @@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { Review comment: Is the comment above accurate ? It seems we are indeed replacing the executors that are decommissioned when adjustTargetNumExecutors = true. On a related note, should `adjustTargetNumExecutors` be simply renamed as `replaceDecommissionedExecutors` ? to make the meaning be more direct ? ########## File path: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala ########## @@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - client.killExecutor(execIdToRemove) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + client.decommissionExecutor(execIdToRemove, + ExecutorDecommissionInfo("spark scale down", false), + adjustTargetNumExecutors = true) Review comment: I feel that this is the ONLY place where adjustTargetNumExecutors should be set to true. ########## File path: streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala ########## @@ -83,12 +96,26 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase Map.empty)} } - /** Verify that a particular executor was killed */ + /** Verify that a particular executor was scaled down. */ def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { Review comment: Do you want to rename the method to verifyScaledDownExec ? to make it match the comment change ? ########## File path: core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala ########## @@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) + // Make the executors decommission, finish, exit, and not be replaced. + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))) + sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) Review comment: I am confused here: `adjustTargetNumExecutors = true` means that the executor should be replaced (IIUC). Whereas the comment above says "not be replaced". ########## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala ########## @@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS logInfo(s"Decommissioning executor ${execToDecommission}") sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", isHostDecommissioned = false)) + ExecutorDecommissionInfo("", isHostDecommissioned = false), + adjustTargetNumExecutors = true) Review comment: Why is this true and not false ? We explicitly want to kill and discard the executor here without replacing it. Although the test does not truly care, but still why the change ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org