Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20138#discussion_r161095561
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
         try {
           val newLastScanTime = getNewLastScanTime()
           logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
    -      // scan for modified applications, replay and merge them
    -      val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
    +
    +      val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
             .filter { entry =>
               !entry.isDirectory() &&
                 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
                 // reading a garbage file is safe, but we would log an error 
which can be scary to
                 // the end-user.
                 !entry.getPath().getName().startsWith(".") &&
    -            SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
    -            recordedFileSize(entry.getPath()) < entry.getLen()
    +            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
    +        }
    +        .filter { entry =>
    +          try {
    +            val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
    +            if (info.fileSize < entry.getLen()) {
    +              // Log size has changed, it should be parsed.
    +              true
    +            } else {
    +              // If the SHS view has a valid application, update the time 
the file was last seen so
    +              // that the entry is not deleted from the SHS listing.
    +              if (info.appId.isDefined) {
    +                listing.write(info.copy(lastProcessed = newLastScanTime))
    +              }
    +              false
    +            }
    +          } catch {
    +            case _: NoSuchElementException =>
    +              // If the file is currently not being tracked by the SHS, 
add an entry for it and try
    +              // to parse it. This will allow the cleaner code to detect 
the file as stale later on
    +              // if it was not possible to parse it.
    +              listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
    +                entry.getLen()))
    +              entry.getLen() > 0
    +          }
             }
             .sortWith { case (entry1, entry2) =>
               entry1.getModificationTime() > entry2.getModificationTime()
             }
     
    -      if (logInfos.nonEmpty) {
    -        logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
    +      if (updated.nonEmpty) {
    +        logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
           }
     
    -      var tasks = mutable.ListBuffer[Future[_]]()
    -
    -      try {
    -        for (file <- logInfos) {
    -          tasks += replayExecutor.submit(new Runnable {
    -            override def run(): Unit = mergeApplicationListing(file)
    +      val tasks = updated.map { entry =>
    +        try {
    +          replayExecutor.submit(new Runnable {
    +            override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
               })
    +        } catch {
    +          // let the iteration over logInfos break, since an exception on
    --- End diff --
    
    and actually you've moved the try/catch so this is no longer true, you'll 
continue to submit all tasks if one throws an exception.  (I guess I'm not 
really sure why the old code did it that way ...)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to