[ 
https://issues.apache.org/jira/browse/YARN-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350214#comment-17350214
 ] 

Xiping Zhang commented on YARN-10781:
-------------------------------------

Sorry for the late reply.
{code:java}
// ExecutorAllocationManager.scala

/**
 * Register for scheduler callbacks to decide when to add and remove executors, 
and start
 * the scheduling task.
 */
def start(): Unit = {
  listenerBus.addToManagementQueue(listener)

  val scheduleTask = new Runnable() {
    override def run(): Unit = {
      try {
        schedule()
      } catch {
        case ct: ControlThrowable =>
          throw ct
        case t: Throwable =>
          logWarning(s"Uncaught exception in thread 
${Thread.currentThread().getName}", t)
      }
    }
  }
  executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, 
TimeUnit.MILLISECONDS)

  client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
}
{code}
The schedule method is periodically executed .

 

 
{code:java}
/**
 * This is called at a fixed interval to regulate the number of pending 
executor requests
 * and number of executors running.
 *
 * First, adjust our requested executors based on the add time and our current 
needs.
 * Then, if the remove time for an existing executor has expired, kill the 
executor.
 *
 * This is factored out into its own method for testing.
 */
private def schedule(): Unit = synchronized {
  val now = clock.getTimeMillis

  val executorIdsToBeRemoved = ArrayBuffer[String]()
  removeTimes.retain { case (executorId, expireTime) =>
    val expired = now >= expireTime
    if (expired) {
      initializing = false
      executorIdsToBeRemoved += executorId
    }
    !expired
  }
  // Update executor target number only after initializing flag is unset
  updateAndSyncNumExecutorsTarget(now)
  if (executorIdsToBeRemoved.nonEmpty) {
    removeExecutors(executorIdsToBeRemoved)
  }
}


{code}
This will remove executors from the executorIdsToBeRemoved set.

 
{code:java}
/**
 * Request the cluster manager to remove the given executors.
 * Returns the list of executors which are removed.
 */
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized 
{
  val executorIdsToBeRemoved = new ArrayBuffer[String]

  logInfo("Request to remove executorIds: " + executors.mkString(", "))
  val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size

  var newExecutorTotal = numExistingExecutors
  executors.foreach { executorIdToBeRemoved =>
    if (newExecutorTotal - 1 < minNumExecutors) {
      logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
        s"$newExecutorTotal executor(s) left (minimum number of executor limit 
$minNumExecutors)")
    } else if (newExecutorTotal - 1 < numExecutorsTarget) {
      logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
        s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
    } else if (canBeKilled(executorIdToBeRemoved)) {
      executorIdsToBeRemoved += executorIdToBeRemoved
      newExecutorTotal -= 1
    }
  }

  if (executorIdsToBeRemoved.isEmpty) {
    return Seq.empty[String]
  }

  // Send a request to the backend to kill this executor(s)
  val executorsRemoved = if (testing) {
    executorIdsToBeRemoved
  } else {
    // We don't want to change our target number of executors, because we 
already did that
    // when the task backlog decreased.
    client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = 
false,
      countFailures = false, force = false)
  }
  // [SPARK-21834] killExecutors api reduces the target number of executors.
  // So we need to update the target with desired value.
  client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
  // reset the newExecutorTotal to the existing number of executors
  newExecutorTotal = numExistingExecutors
  if (testing || executorsRemoved.nonEmpty) {
    executorsRemoved.foreach { removedExecutorId =>
      // If it is a cached block, it uses cachedExecutorIdleTimeoutS for timeout
      val idleTimeout = if 
(blockManagerMaster.hasCachedBlocks(removedExecutorId)) {
        cachedExecutorIdleTimeoutS
      } else {
        executorIdleTimeoutS
      }
      newExecutorTotal -= 1
      logInfo(s"Removing executor $removedExecutorId because it has been idle 
for " +
        s"$idleTimeout seconds (new desired total will be $newExecutorTotal)")
      executorsPendingToRemove.add(removedExecutorId)
    }
    executorsRemoved
  } else {
    logWarning(s"Unable to reach the cluster manager to kill executor/s " +
      s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to 
kill!")
    Seq.empty[String]
  }
}
{code}
 

 

 

 

> The Thread of the NM aggregate log is exhausted and no other Application can 
> aggregate the log
> ----------------------------------------------------------------------------------------------
>
>                 Key: YARN-10781
>                 URL: https://issues.apache.org/jira/browse/YARN-10781
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: yarn
>    Affects Versions: 2.9.2, 3.3.0
>            Reporter: Xiping Zhang
>            Priority: Major
>
> We observed more than 100 applications running on one NM.Most of these 
> applications are SparkStreaming tasks, but these applications do not have 
> running Containers.When the offline application running on it finishes, the 
> log cannot be reported to HDFS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to