Github user ajbozarth commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18887#discussion_r132770167
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -422,208 +454,101 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
           }
         }
     
    -    applications.get(appId) match {
    -      case Some(appInfo) =>
    -        try {
    -          // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
    -          appInfo.attempts.filter { attempt =>
    -            attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
    -          }.foreach { attempt =>
    -            val logPath = new Path(logDir, attempt.logPath)
    -            zipFileToStream(logPath, attempt.logPath, zipStream)
    -          }
    -        } finally {
    -          zipStream.close()
    +    val app = try {
    +      load(appId)
    +    } catch {
    +      case _: NoSuchElementException =>
    +        throw new SparkException(s"Logs for $appId not found.")
    +    }
    +
    +    try {
    +      // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
    +      attemptId
    +        .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
    +        .getOrElse(app.attempts)
    +        .map(_.logPath)
    +        .foreach { log =>
    +          zipFileToStream(new Path(logDir, log), log, zipStream)
             }
    -      case None => throw new SparkException(s"Logs for $appId not found.")
    +    } finally {
    +      zipStream.close()
         }
       }
     
       /**
    -   * Replay the log files in the list and merge the list of old 
applications with new ones
    +   * Replay the given log file, saving the application in the listing db.
        */
       protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
    -    val newAttempts = try {
    -      val eventsFilter: ReplayEventsFilter = { eventString =>
    -        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
    -          eventString.startsWith(APPL_END_EVENT_PREFIX) ||
    -          eventString.startsWith(LOG_START_EVENT_PREFIX)
    -      }
    -
    -      val logPath = fileStatus.getPath()
    -      val appCompleted = isApplicationCompleted(fileStatus)
    -
    -      // Use loading time as lastUpdated since some filesystems don't 
update modifiedTime
    -      // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
    -      // won't change whenever HistoryServer restarts and reloads the file.
    -      val lastUpdated = if (appCompleted) fileStatus.getModificationTime 
else clock.getTimeMillis()
    -
    -      val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
    -
    -      // Without an app ID, new logs will render incorrectly in the 
listing page, so do not list or
    -      // try to show their UI.
    -      if (appListener.appId.isDefined) {
    -        val attemptInfo = new FsApplicationAttemptInfo(
    -          logPath.getName(),
    -          appListener.appName.getOrElse(NOT_STARTED),
    -          appListener.appId.getOrElse(logPath.getName()),
    -          appListener.appAttemptId,
    -          appListener.startTime.getOrElse(-1L),
    -          appListener.endTime.getOrElse(-1L),
    -          lastUpdated,
    -          appListener.sparkUser.getOrElse(NOT_STARTED),
    -          appCompleted,
    -          fileStatus.getLen(),
    -          appListener.appSparkVersion.getOrElse("")
    -        )
    -        fileToAppInfo.put(logPath, attemptInfo)
    -        logDebug(s"Application log ${attemptInfo.logPath} loaded 
successfully: $attemptInfo")
    -        Some(attemptInfo)
    -      } else {
    -        logWarning(s"Failed to load application log ${fileStatus.getPath}. 
" +
    -          "The application may have not started.")
    -        None
    -      }
    -
    -    } catch {
    -      case e: Exception =>
    -        logError(
    -          s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
    -          e)
    -        None
    -    }
    -
    -    if (newAttempts.isEmpty) {
    -      return
    +    val eventsFilter: ReplayEventsFilter = { eventString =>
    +      eventString.startsWith(APPL_START_EVENT_PREFIX) ||
    +        eventString.startsWith(APPL_END_EVENT_PREFIX) ||
    +        eventString.startsWith(LOG_START_EVENT_PREFIX)
         }
     
    -    // Build a map containing all apps that contain new attempts. The app 
information in this map
    -    // contains both the new app attempt, and those that were already 
loaded in the existing apps
    -    // map. If an attempt has been updated, it replaces the old attempt in 
the list.
    -    val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
    -
    -    applications.synchronized {
    -      newAttempts.foreach { attempt =>
    -        val appInfo = newAppMap.get(attempt.appId)
    -          .orElse(applications.get(attempt.appId))
    -          .map { app =>
    -            val attempts =
    -              app.attempts.filter(_.attemptId != attempt.attemptId) ++ 
List(attempt)
    -            new FsApplicationHistoryInfo(attempt.appId, attempt.name,
    -              attempts.sortWith(compareAttemptInfo))
    -          }
    -          .getOrElse(new FsApplicationHistoryInfo(attempt.appId, 
attempt.name, List(attempt)))
    -        newAppMap(attempt.appId) = appInfo
    -      }
    -
    -      // Merge the new app list with the existing one, maintaining the 
expected ordering (descending
    -      // end time). Maintaining the order is important to avoid having to 
sort the list every time
    -      // there is a request for the log list.
    -      val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
    -      val mergedApps = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
    -      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
    -        if (!mergedApps.contains(info.id)) {
    -          mergedApps += (info.id -> info)
    -        }
    -      }
    +    val logPath = fileStatus.getPath()
    +    logInfo(s"Replaying log path: $logPath")
     
    -      val newIterator = newApps.iterator.buffered
    -      val oldIterator = applications.values.iterator.buffered
    -      while (newIterator.hasNext && oldIterator.hasNext) {
    -        if (newAppMap.contains(oldIterator.head.id)) {
    -          oldIterator.next()
    -        } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
    -          addIfAbsent(newIterator.next())
    -        } else {
    -          addIfAbsent(oldIterator.next())
    -        }
    -      }
    -      newIterator.foreach(addIfAbsent)
    -      oldIterator.foreach(addIfAbsent)
    +    val bus = new ReplayListenerBus()
    +    val listener = new AppListingListener(fileStatus, clock)
    +    bus.addListener(listener)
     
    -      applications = mergedApps
    -    }
    +    replay(fileStatus, isApplicationCompleted(fileStatus), bus, 
eventsFilter)
    +    listener.applicationInfo.foreach(addListing)
    +    listing.write(new LogInfo(logPath.toString(), fileStatus.getLen()))
       }
     
       /**
        * Delete event logs from the log directory according to the clean 
policy defined by the user.
        */
       private[history] def cleanLogs(): Unit = {
    +    var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
         try {
    -      val maxAge = 
conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
    -
    -      val now = clock.getTimeMillis()
    -      val appsToRetain = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
    -
    -      def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
    -        now - attempt.lastUpdated > maxAge
    -      }
    -
    -      // Scan all logs from the log directory.
    -      // Only completed applications older than the specified max age will 
be deleted.
    -      applications.values.foreach { app =>
    -        val (toClean, toRetain) = app.attempts.partition(shouldClean)
    -        attemptsToClean ++= toClean
    -
    -        if (toClean.isEmpty) {
    -          appsToRetain += (app.id -> app)
    -        } else if (toRetain.nonEmpty) {
    -          appsToRetain += (app.id ->
    -            new FsApplicationHistoryInfo(app.id, app.name, 
toRetain.toList))
    +      val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
    +
    +      // Iterate descending over all applications whose oldest attempt is 
older than the maxAge.
    --- End diff --
    
    `maxAge` -> `maxTime`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to