Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/9571#discussion_r111942037 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -310,77 +338,87 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { - try { - val newLastScanTime = getNewLastScanTime() - logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) - // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList - .filter { entry => - try { - val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) - !entry.isDirectory() && - // FsHistoryProvider generates a hidden file which can't be read. Accidentally - // reading a garbage file is safe, but we would log an error which can be scary to - // the end-user. - !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() - } catch { - case e: AccessControlException => - // Do not use "logInfo" since these messages can get pretty noisy if printed on - // every poll. - logDebug(s"No permission to read $entry, ignoring.") - false + metrics.updateCount.inc() + metrics.updateLastAttempted.touch() + time(metrics.updateTimer) { + try { + val newLastScanTime = getNewLastScanTime() + logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) + .getOrElse(Seq[FileStatus]()) + // scan for modified applications, replay and merge them + val logInfos: Seq[FileStatus] = statusList + .filter { entry => + try { + val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + !entry.isDirectory() && + // FsHistoryProvider generates a hidden file which can't be read. Accidentally + // reading a garbage file is safe, but we would log an error which can be scary to + // the end-user. + !entry.getPath().getName().startsWith(".") && + prevFileSize < entry.getLen() + } catch { + case e: AccessControlException => + // Do not use "logInfo" since these messages can get pretty noisy if printed on + // every poll. + logDebug(s"No permission to read $entry, ignoring.") + false + } + } + .flatMap { entry => Some(entry) } + .sortWith { case (entry1, entry2) => + entry1.getModificationTime() >= entry2.getModificationTime() } - } - .flatMap { entry => Some(entry) } - .sortWith { case (entry1, entry2) => - entry1.getModificationTime() >= entry2.getModificationTime() - } if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") } - var tasks = mutable.ListBuffer[Future[_]]() - - try { - for (file <- logInfos) { - tasks += replayExecutor.submit(new Runnable { - override def run(): Unit = mergeApplicationListing(file) - }) - } - } catch { - // let the iteration over logInfos break, since an exception on - // replayExecutor.submit (..) indicates the ExecutorService is unable - // to take any more submissions at this time - - case e: Exception => - logError(s"Exception while submitting event log for replay", e) - } - - pendingReplayTasksCount.addAndGet(tasks.size) + var tasks = mutable.ListBuffer[Future[_]]() - tasks.foreach { task => try { - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - task.get() + for (file <- logInfos) { + tasks += replayExecutor.submit(new Runnable { + override def run(): Unit = + time(metrics.historyMergeTimer, Some(metrics.historyTotalMergeTime)) { + mergeApplicationListing(file) + } + }) + } } catch { - case e: InterruptedException => - throw e + // let the iteration over logInfos break, since an exception on + // replayExecutor.submit (..) indicates the ExecutorService is unable + // to take any more submissions at this time + case e: Exception => - logError("Exception while merging application listings", e) - } finally { - pendingReplayTasksCount.decrementAndGet() + logError(s"Exception while submitting event log for replay", e) } - } - lastScanTime.set(newLastScanTime) - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) + pendingReplayTasksCount.addAndGet(tasks.size) + + tasks.foreach { task => + try { + // Wait for all tasks to finish. This makes sure that checkForLogs + // is not scheduled again while some tasks are already running in + // the replayExecutor. + task.get() + } catch { + case e: InterruptedException => + throw e + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + pendingReplayTasksCount.decrementAndGet() + } + } + + lastScanTime.set(newLastScanTime) + metrics.updateLastSucceeded.setValue(newLastScanTime) + } catch { + case e: Exception => logError( --- End diff -- done
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org