Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154816346 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc freshUI.get.ui.store.job(0) } + /** + * Validate aggressive clean up removes incomplete or corrupt history files that would + * otherwise be missed during clean up. Also validate no behavior change if aggressive + * clean up is disabled. + */ + test("SPARK-21571: aggressive clean up removes incomplete history files") { + createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, true) + createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, true, false) + createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, false, false) + createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, false, false) + } + + /** + * Create four test incomplete/corrupt history files and invoke a check and clean cycle that + * passes followed by one occurring after the max age days rentention window and assert the + * expected number of history files remain. + * @param maxAgeDays maximum retention in days, used to simulate current time + * @param lastModifiedDaysAgo last modified date for test files relative to current time + * @param aggressiveCleanup aggressive clean up is enabled or not + * @param expectEmptyInprogressRemoved expect an empty inprogress file to be removed + * @param expectEmptyCorruptRemoved expect an empty corrupt complete file to be removed + * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress file to be removed + * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt complete file to be removed + */ + private def createCleanAndCheckIncompleteLogFiles( + maxAgeDays: Long, + lastModifiedDaysAgo: Long, + aggressiveCleanup: Boolean, + expectEmptyInprogressRemoved: Boolean, + expectEmptyCorruptRemoved: Boolean, + expectNonEmptyInprogressRemoved: Boolean, + expectNonEmptyCorruptRemoved: Boolean) = { + // Set current time as 2 * maximum retention period to allow for expired history files. + val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, TimeUnit.DAYS) + val clock = new ManualClock(currentTimeMillis) + + val lastModifiedTime = currentTimeMillis - + MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS) + + val provider = new FsHistoryProvider( + createTestConf() + .set("spark.history.fs.cleaner.aggressive", s"${aggressiveCleanup}") + .set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d") + .set("spark.testing", "true"), + clock) { + override def getNewLastScanTime(): Long = clock.getTimeMillis + } + + // Create history files + // 1. 0-byte size files inprogress and corrupt complete files + // 2. >0 byte size files inprogress and corrupt complete files + + try { + val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = true) + logfile1.createNewFile + logfile1.setLastModified(lastModifiedTime) + + val logfile2 = newLogFile("emptyCorruptLogFile", None, inProgress = false) + logfile2.createNewFile + logfile2.setLastModified(lastModifiedTime) + + // Create an inprogress log file, has only start record. + val logfile3 = newLogFile("nonEmptyInprogressLogFile", None, inProgress = true) + writeFile(logfile3, true, None, SparkListenerApplicationStart( + "inProgress1", Some("inProgress1"), 3L, "test", Some("attempt1")) + ) + logfile3.setLastModified(lastModifiedTime) + + // Create an incomplete log file, has an end record but no start record. + val logfile4 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false) + writeFile(logfile4, true, None, SparkListenerApplicationEnd(0)) + logfile4.setLastModified(lastModifiedTime) + + // Simulate checking logs 1 day after initial creation. This is necessary because the log + // checker will sometimes use the current time in place of last modified time the first + // time it encounters an inprogress file to work around certain file system inconsistencies. + // No history files should clean up in first check and clean pass. + clock.setTime(lastModifiedTime + MILLISECONDS.convert(1, TimeUnit.DAYS)) + provider.checkForLogs --- End diff -- Add `()` to method calls (also in other places).
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org