Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154817305 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc freshUI.get.ui.store.job(0) } + /** + * Validate aggressive clean up removes incomplete or corrupt history files that would + * otherwise be missed during clean up. Also validate no behavior change if aggressive + * clean up is disabled. + */ + test("SPARK-21571: aggressive clean up removes incomplete history files") { + createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, true) + createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, true, false) + createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, false, false) + createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, false, false) + } + + /** + * Create four test incomplete/corrupt history files and invoke a check and clean cycle that + * passes followed by one occurring after the max age days rentention window and assert the + * expected number of history files remain. + * @param maxAgeDays maximum retention in days, used to simulate current time + * @param lastModifiedDaysAgo last modified date for test files relative to current time + * @param aggressiveCleanup aggressive clean up is enabled or not + * @param expectEmptyInprogressRemoved expect an empty inprogress file to be removed + * @param expectEmptyCorruptRemoved expect an empty corrupt complete file to be removed + * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress file to be removed + * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt complete file to be removed + */ + private def createCleanAndCheckIncompleteLogFiles( + maxAgeDays: Long, + lastModifiedDaysAgo: Long, + aggressiveCleanup: Boolean, + expectEmptyInprogressRemoved: Boolean, + expectEmptyCorruptRemoved: Boolean, + expectNonEmptyInprogressRemoved: Boolean, + expectNonEmptyCorruptRemoved: Boolean) = { + // Set current time as 2 * maximum retention period to allow for expired history files. + val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, TimeUnit.DAYS) + val clock = new ManualClock(currentTimeMillis) + + val lastModifiedTime = currentTimeMillis - + MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS) + + val provider = new FsHistoryProvider( + createTestConf() + .set("spark.history.fs.cleaner.aggressive", s"${aggressiveCleanup}") + .set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d") + .set("spark.testing", "true"), + clock) { + override def getNewLastScanTime(): Long = clock.getTimeMillis + } + + // Create history files + // 1. 0-byte size files inprogress and corrupt complete files + // 2. >0 byte size files inprogress and corrupt complete files --- End diff -- So this is what I refer to in a previous comment. You probably need another test here where you have an in progress, valid file that is being appended to, but where the time since its last mod time exceeds the cleanup period.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org