[ 
https://issues.apache.org/jira/browse/YARN-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17350226#comment-17350226
 ] 

Xiping Zhang commented on YARN-10781:
-------------------------------------

Aggregated logs are handled by NM.When NM initializes an Application, it 
allocates a thread to do the aggregate logging for that application.Here's the 
code on the NM side.

 

 

 
{code:java}
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
    Credentials credentials, Map<ApplicationAccessType, String> appAcls,
    LogAggregationContext logAggregationContext,
    long recoveredLogInitedTime) {
  ApplicationEvent eventResponse;
  try {
    initAppAggregator(appId, user, credentials, appAcls,
        logAggregationContext, recoveredLogInitedTime);
    eventResponse = new ApplicationEvent(appId,
        ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
  } catch (YarnRuntimeException e) {
    LOG.warn("Application failed to init aggregation", e);
    eventResponse = new ApplicationEvent(appId,
        ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
  }
  this.dispatcher.getEventHandler().handle(eventResponse);
}
{code}
{code:java}
protected void initAppAggregator(final ApplicationId appId, String user,
    Credentials credentials, Map<ApplicationAccessType, String> appAcls,
    LogAggregationContext logAggregationContext,
    long recoveredLogInitedTime) {
 ...

  final AppLogAggregator appLogAggregator =
      new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
          getConfig(), appId, userUgi, this.nodeId, dirsHandler,
          logAggregationFileController.getRemoteNodeLogFileForApp(appId,
          user, nodeId), appAcls, logAggregationContext, this.context,
          getLocalFileContext(getConfig()), this.rollingMonitorInterval,
          recoveredLogInitedTime, logAggregationFileController);
  ...

  // Schedule the aggregator.
  Runnable aggregatorWrapper = new Runnable() {
    public void run() {
      try {
        appLogAggregator.run();
      } finally {
        appLogAggregators.remove(appId);
        closeFileSystems(userUgi);
      }
    }
  };
  this.threadPool.execute(aggregatorWrapper);
  if (appDirException != null) {
    throw appDirException;
  }
}
{code}
{code:java}
// AppLogAggregatorImpl.java

@Override
public void run() {
  try {
    doAppLogAggregation();
  }
 ... 
}

{code}
 
{code:java}
//
private void doAppLogAggregation() throws LogAggregationDFSException {
  while (!this.appFinishing.get() && !this.aborted.get()) {
    synchronized(this) {
      try {
        waiting.set(true);
        if (logControllerContext.isLogAggregationInRolling()) {
          wait(logControllerContext.getRollingMonitorInterval() * 1000);
          if (this.appFinishing.get() || this.aborted.get()) {
            break;
          }
          uploadLogsForContainers(false);
        } else {
          wait(THREAD_SLEEP_TIME);
        }
      } catch (InterruptedException e) {
        LOG.warn("PendingContainers queue is interrupted");
        this.appFinishing.set(true);
      } catch (LogAggregationDFSException e) {
        this.appFinishing.set(true);
        throw e;
      }
    }
  }

  if (this.aborted.get()) {
    return;
  }

  try {
    // App is finished, upload the container logs.
    uploadLogsForContainers(true);

    doAppLogAggregationPostCleanUp();
  } catch (LogAggregationDFSException e) {
    LOG.error("Error during log aggregation", e);
  }

  this.dispatcher.getEventHandler().handle(
      new ApplicationEvent(this.appId,
          ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
  this.appAggregationFinished.set(true);
}

{code}
When handling the APPLICATION_STARTED event at NM, NM initializes the App and 
initializes an ApplogAggregatorImpl to handle the log aggregation, which 
allocates a thread to run its run method.Inside this is a loop until the task 
is Finished or aborted. 

 

 

 

 

 

 

> The Thread of the NM aggregate log is exhausted and no other Application can 
> aggregate the log
> ----------------------------------------------------------------------------------------------
>
>                 Key: YARN-10781
>                 URL: https://issues.apache.org/jira/browse/YARN-10781
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: yarn
>    Affects Versions: 2.9.2, 3.3.0
>            Reporter: Xiping Zhang
>            Priority: Major
>
> We observed more than 100 applications running on one NM.Most of these 
> applications are SparkStreaming tasks, but these applications do not have 
> running Containers.When the offline application running on it finishes, the 
> log cannot be reported to HDFS.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to