[ https://issues.apache.org/jira/browse/YARN-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350214#comment-17350214 ]
Xiping Zhang commented on YARN-10781: ------------------------------------- Sorry for the late reply. {code:java} // ExecutorAllocationManager.scala /** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */ def start(): Unit = { listenerBus.addToManagementQueue(listener) val scheduleTask = new Runnable() { override def run(): Unit = { try { schedule() } catch { case ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } } executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) } {code} The schedule method is periodically executed . {code:java} /** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. * * First, adjust our requested executors based on the add time and our current needs. * Then, if the remove time for an existing executor has expired, kill the executor. * * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { val now = clock.getTimeMillis val executorIdsToBeRemoved = ArrayBuffer[String]() removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { initializing = false executorIdsToBeRemoved += executorId } !expired } // Update executor target number only after initializing flag is unset updateAndSyncNumExecutorsTarget(now) if (executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } } {code} This will remove executors from the executorIdsToBeRemoved set. {code:java} /** * Request the cluster manager to remove the given executors. * Returns the list of executors which are removed. */ private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized { val executorIdsToBeRemoved = new ArrayBuffer[String] logInfo("Request to remove executorIds: " + executors.mkString(", ")) val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size var newExecutorTotal = numExistingExecutors executors.foreach { executorIdToBeRemoved => if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)") } else if (newExecutorTotal - 1 < numExecutorsTarget) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " + s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)") } else if (canBeKilled(executorIdToBeRemoved)) { executorIdsToBeRemoved += executorIdToBeRemoved newExecutorTotal -= 1 } } if (executorIdsToBeRemoved.isEmpty) { return Seq.empty[String] } // Send a request to the backend to kill this executor(s) val executorsRemoved = if (testing) { executorIdsToBeRemoved } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, countFailures = false, force = false) } // [SPARK-21834] killExecutors api reduces the target number of executors. // So we need to update the target with desired value. client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) // reset the newExecutorTotal to the existing number of executors newExecutorTotal = numExistingExecutors if (testing || executorsRemoved.nonEmpty) { executorsRemoved.foreach { removedExecutorId => // If it is a cached block, it uses cachedExecutorIdleTimeoutS for timeout val idleTimeout = if (blockManagerMaster.hasCachedBlocks(removedExecutorId)) { cachedExecutorIdleTimeoutS } else { executorIdleTimeoutS } newExecutorTotal -= 1 logInfo(s"Removing executor $removedExecutorId because it has been idle for " + s"$idleTimeout seconds (new desired total will be $newExecutorTotal)") executorsPendingToRemove.add(removedExecutorId) } executorsRemoved } else { logWarning(s"Unable to reach the cluster manager to kill executor/s " + s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!") Seq.empty[String] } } {code} > The Thread of the NM aggregate log is exhausted and no other Application can > aggregate the log > ---------------------------------------------------------------------------------------------- > > Key: YARN-10781 > URL: https://issues.apache.org/jira/browse/YARN-10781 > Project: Hadoop YARN > Issue Type: Bug > Components: yarn > Affects Versions: 2.9.2, 3.3.0 > Reporter: Xiping Zhang > Priority: Major > > We observed more than 100 applications running on one NM.Most of these > applications are SparkStreaming tasks, but these applications do not have > running Containers.When the offline application running on it finishes, the > log cannot be reported to HDFS. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org