Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9571#discussion_r111942037
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -310,77 +338,87 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
        * applications that haven't been updated since last time the logs were 
checked.
        */
       private[history] def checkForLogs(): Unit = {
    -    try {
    -      val newLastScanTime = getNewLastScanTime()
    -      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
    -      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
    -        .getOrElse(Seq[FileStatus]())
    -      // scan for modified applications, replay and merge them
    -      val logInfos: Seq[FileStatus] = statusList
    -        .filter { entry =>
    -          try {
    -            val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
    -            !entry.isDirectory() &&
    -              // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
    -              // reading a garbage file is safe, but we would log an error 
which can be scary to
    -              // the end-user.
    -              !entry.getPath().getName().startsWith(".") &&
    -              prevFileSize < entry.getLen()
    -          } catch {
    -            case e: AccessControlException =>
    -              // Do not use "logInfo" since these messages can get pretty 
noisy if printed on
    -              // every poll.
    -              logDebug(s"No permission to read $entry, ignoring.")
    -              false
    +    metrics.updateCount.inc()
    +    metrics.updateLastAttempted.touch()
    +    time(metrics.updateTimer) {
    +      try {
    +        val newLastScanTime = getNewLastScanTime()
    +        logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
    +        val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq)
    +          .getOrElse(Seq[FileStatus]())
    +        // scan for modified applications, replay and merge them
    +        val logInfos: Seq[FileStatus] = statusList
    +          .filter { entry =>
    +            try {
    +              val prevFileSize = 
fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
    +              !entry.isDirectory() &&
    +                // FsHistoryProvider generates a hidden file which can't 
be read.  Accidentally
    +                // reading a garbage file is safe, but we would log an 
error which can be scary to
    +                // the end-user.
    +                !entry.getPath().getName().startsWith(".") &&
    +                prevFileSize < entry.getLen()
    +            } catch {
    +              case e: AccessControlException =>
    +                // Do not use "logInfo" since these messages can get 
pretty noisy if printed on
    +                // every poll.
    +                logDebug(s"No permission to read $entry, ignoring.")
    +                false
    +            }
    +          }
    +          .flatMap { entry => Some(entry) }
    +          .sortWith { case (entry1, entry2) =>
    +            entry1.getModificationTime() >= entry2.getModificationTime()
               }
    -        }
    -        .flatMap { entry => Some(entry) }
    -        .sortWith { case (entry1, entry2) =>
    -          entry1.getModificationTime() >= entry2.getModificationTime()
    -      }
     
           if (logInfos.nonEmpty) {
             logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
           }
     
    -      var tasks = mutable.ListBuffer[Future[_]]()
    -
    -      try {
    -        for (file <- logInfos) {
    -          tasks += replayExecutor.submit(new Runnable {
    -            override def run(): Unit = mergeApplicationListing(file)
    -          })
    -        }
    -      } catch {
    -        // let the iteration over logInfos break, since an exception on
    -        // replayExecutor.submit (..) indicates the ExecutorService is 
unable
    -        // to take any more submissions at this time
    -
    -        case e: Exception =>
    -          logError(s"Exception while submitting event log for replay", e)
    -      }
    -
    -      pendingReplayTasksCount.addAndGet(tasks.size)
    +        var tasks = mutable.ListBuffer[Future[_]]()
     
    -      tasks.foreach { task =>
             try {
    -          // Wait for all tasks to finish. This makes sure that 
checkForLogs
    -          // is not scheduled again while some tasks are already running in
    -          // the replayExecutor.
    -          task.get()
    +          for (file <- logInfos) {
    +            tasks += replayExecutor.submit(new Runnable {
    +              override def run(): Unit =
    +                time(metrics.historyMergeTimer, 
Some(metrics.historyTotalMergeTime)) {
    +                  mergeApplicationListing(file)
    +              }
    +            })
    +          }
             } catch {
    -          case e: InterruptedException =>
    -            throw e
    +          // let the iteration over logInfos break, since an exception on
    +          // replayExecutor.submit (..) indicates the ExecutorService is 
unable
    +          // to take any more submissions at this time
    +
               case e: Exception =>
    -            logError("Exception while merging application listings", e)
    -        } finally {
    -          pendingReplayTasksCount.decrementAndGet()
    +            logError(s"Exception while submitting event log for replay", e)
             }
    -      }
     
    -      lastScanTime.set(newLastScanTime)
    -    } catch {
    -      case e: Exception => logError("Exception in checking for event log 
updates", e)
    +        pendingReplayTasksCount.addAndGet(tasks.size)
    +
    +        tasks.foreach { task =>
    +          try {
    +            // Wait for all tasks to finish. This makes sure that 
checkForLogs
    +            // is not scheduled again while some tasks are already running 
in
    +            // the replayExecutor.
    +            task.get()
    +          } catch {
    +            case e: InterruptedException =>
    +              throw e
    +            case e: Exception =>
    +              logError("Exception while merging application listings", e)
    +          } finally {
    +            pendingReplayTasksCount.decrementAndGet()
    +          }
    +        }
    +
    +        lastScanTime.set(newLastScanTime)
    +        metrics.updateLastSucceeded.setValue(newLastScanTime)
    +      } catch {
    +        case e: Exception => logError(
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to