Github user ankuriitg commented on a diff in the pull request: https://github.com/apache/spark/pull/22504#discussion_r226710526 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -800,14 +817,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) stale.foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") - deleteLog(new Path(log.logPath)) + deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } } // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + /** + * Delete driver logs from the configured spark dfs dir that exceed the configured max age + */ + private[history] def cleanDriverLogs(): Unit = Utils.tryLog { + val driverLogDir = conf.get(DRIVER_LOG_DFS_DIR) + driverLogDir.foreach { dl => + val maxTime = clock.getTimeMillis() - + conf.get(MAX_DRIVER_LOG_AGE_S) * 1000 + val appDirs = driverLogFs.get.listLocatedStatus(new Path(dl)) + while (appDirs.hasNext()) { + val appDirStatus = appDirs.next() + if (appDirStatus.getModificationTime() < maxTime) { + logInfo(s"Deleting expired driver log for: ${appDirStatus.getPath().getName()}") + deleteLog(driverLogFs.get, appDirStatus.getPath()) --- End diff -- Sorry, I don't understand your comment. Can you elaborate please? The problem that you described was the old cleaner logic may delete (or try to delete) a log for an active app. With the solution that I added, I am tracking the fileLength and lastProcessedTime. If the fileLength is updated, I do not delete the file and update the lastProcessedTime instead. And I rely on lastProcessedTime to determine whether it is time to delete the file. Please let me know if there is an issue in the logic.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org