Github user brad-kaiser commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19041#discussion_r155031960
  
    --- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
    @@ -416,63 +423,52 @@ private[spark] class ExecutorAllocationManager(
        * Request the cluster manager to remove the given executors.
        * Returns the list of executors which are removed.
        */
    -  private def removeExecutors(executors: Seq[String]): Seq[String] = 
synchronized {
    -    val executorIdsToBeRemoved = new ArrayBuffer[String]
    -
    +  private def removeExecutors(executors: Seq[String]): Unit = synchronized 
{
         logInfo("Request to remove executorIds: " + executors.mkString(", "))
    -    val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
    -
    -    var newExecutorTotal = numExistingExecutors
    -    executors.foreach { executorIdToBeRemoved =>
    -      if (newExecutorTotal - 1 < minNumExecutors) {
    -        logDebug(s"Not removing idle executor $executorIdToBeRemoved 
because there are only " +
    -          s"$newExecutorTotal executor(s) left (minimum number of executor 
limit $minNumExecutors)")
    -      } else if (newExecutorTotal - 1 < numExecutorsTarget) {
    -        logDebug(s"Not removing idle executor $executorIdToBeRemoved 
because there are only " +
    -          s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
    -      } else if (canBeKilled(executorIdToBeRemoved)) {
    -        executorIdsToBeRemoved += executorIdToBeRemoved
    -        newExecutorTotal -= 1
    -      }
    -    }
    +    val numExistingExecs = allocationManager.executorIds.size - 
executorsPendingToRemove.size
    +    val execCountFloor = math.max(minNumExecutors, numExecutorsTarget)
    +    val (executorIdsToBeRemoved, dontRemove) = executors
    +      .filter(canBeKilled)
    +      .splitAt(numExistingExecs - execCountFloor)
     
    -    if (executorIdsToBeRemoved.isEmpty) {
    -      return Seq.empty[String]
    +    dontRemove.foreach { execId =>
    +      logDebug(s"Not removing idle executor $execId because it " +
    +        s"would put us below the minimum limit of $minNumExecutors 
executors" +
    +        s"or number of target executors $numExecutorsTarget")
         }
     
    -    // Send a request to the backend to kill this executor(s)
    -    val executorsRemoved = if (testing) {
    -      executorIdsToBeRemoved
    +    if (executorIdsToBeRemoved.isEmpty) {
    +      Seq.empty[String]
    +    } else if (testing) {
    +      recordExecutorKill(executorIdsToBeRemoved)
    +    } else if (recoverCachedData) {
    +      logDebug(s"Starting replicate process for $executorIdsToBeRemoved")
    +      client.markPendingToRemove(executorIdsToBeRemoved)
    +      recordExecutorKill(executorIdsToBeRemoved)
    +      cacheRecoveryManager.startExecutorKill(executorIdsToBeRemoved)
         } else {
    -      client.killExecutors(executorIdsToBeRemoved)
    +      val killed = killExecutors(executorIdsToBeRemoved)
    +      recordExecutorKill(killed)
         }
    -    // [SPARK-21834] killExecutors api reduces the target number of 
executors.
    -    // So we need to update the target with desired value.
    -    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
    -    // reset the newExecutorTotal to the existing number of executors
    -    newExecutorTotal = numExistingExecutors
    -    if (testing || executorsRemoved.nonEmpty) {
    -      executorsRemoved.foreach { removedExecutorId =>
    -        newExecutorTotal -= 1
    -        logInfo(s"Removing executor $removedExecutorId because it has been 
idle for " +
    -          s"$executorIdleTimeoutS seconds (new desired total will be 
$newExecutorTotal)")
    -        executorsPendingToRemove.add(removedExecutorId)
    -      }
    -      executorsRemoved
    -    } else {
    +  }
    +
    +  def killExecutors(executorIds: Seq[String], forceIfPending: Boolean = 
false): Seq[String] = {
    +    logDebug(s"Starting kill process for $executorIds")
    +    val result = client.killExecutors(executorIds, forceIfPending = 
forceIfPending)
    +    if (result.isEmpty) {
           logWarning(s"Unable to reach the cluster manager to kill executor/s 
" +
    -        s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible 
to kill!")
    -      Seq.empty[String]
    +        s"${executorIds.mkString(",")} or no executor eligible to kill!")
         }
    +    result
       }
     
    -  /**
    -   * Request the cluster manager to remove the given executor.
    -   * Return whether the request is acknowledged.
    -   */
    -  private def removeExecutor(executorId: String): Boolean = synchronized {
    -    val executorsRemoved = removeExecutors(Seq(executorId))
    -    executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
    +  private def recordExecutorKill(executorsRemoved: Seq[String]): Unit = 
synchronized {
    +    // [SPARK-21834] killExecutors api reduces the target number of 
executors.
    +    // So we need to update the target with desired value.
    +    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
    +    executorsPendingToRemove ++= executorsRemoved
    +    logInfo(s"Removing executor $executorsRemoved because it has been idle 
for " +
    --- End diff --
    
    fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to