agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r468882865



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +450,88 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      adjustExecutors(executorsToDecommission.map(_._1))
+    }
+
+    val decommissioned = executorsToDecommission.filter { case (executorId, 
decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+    decommissioned
+  }
+
+
+  private def doDecommission(executorId: String,
+      decomInfo: ExecutorDecommissionInfo): Boolean = {
+
+    logInfo(s"Asking executor $executorId to decommissioning.")
+    try {
+      scheduler.executorDecommission(executorId, decomInfo)
+      if (driverEndpoint != null) {
+        logInfo("Propagating executor decommission to driver.")
+        driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))

Review comment:
       I believe there is still a cycle here: Please trace through how 
`DecommissionExecutor` message is handled: It will eventually call this 
doDecommission which will send the message again ... If I understand correctly, 
this may end up live-locking the driver until the poor executor actually dies 
for good.
   
   One way to break this cycle is to directly call `doDecommission` in the 
handlers for `DecommissionExecutor` in this class's `receive` and 
`receiveAndReply` methods, with a special flag that forbids re-sending the 
message.
   
   [blocker]

##########
File path: 
core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala
##########
@@ -242,8 +242,10 @@ class DecommissionWorkerSuite
       assert(jobResult === 2)
     }
     // 6 tasks: 2 from first stage, 2 rerun again from first stage, 2nd stage 
attempt 1 and 2.
-    val tasksSeen = listener.getTasksFinished()

Review comment:
       Would you happen to recall the github actions error you got that lead to 
this change ? I would like to dig further because I invoke the listener using 
TestUtils.withListener(sc, listener): Which waits for the listener to drain and 
also removes the listener. 
   
   So I don't think wrapping this in an eventually should actually be doing 
anything: The listener has already been removed. Perhaps I ought to bring back 
the the "waiting for job done" inside of the getTasksFinished or as a separate 
call. 
   
   I would like to understand further just so that I can learn about some of 
the gotchas with this listener stuff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to