dongjoon-hyun commented on code in PR #38983: URL: https://github.com/apache/spark/pull/38983#discussion_r1048012538
########## core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala: ########## @@ -1705,6 +1705,61 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.stop() } + test("SPARK-41447: clean up expired event log files that don't exist in listing db") { + class TestFsHistoryProvider(conf: SparkConf, clock: Clock) + extends FsHistoryProvider(conf, clock) { + var doMergeApplicationListingCall = 0 + override private[history] def doMergeApplicationListing( + reader: EventLogFileReader, + lastSeen: Long, + enableSkipToEnd: Boolean, + lastCompactionIndex: Option[Long]): Unit = { + super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd, lastCompactionIndex) + doMergeApplicationListingCall += 1 + } + } + + val maxAge = TimeUnit.SECONDS.toMillis(10) + val clock = new ManualClock(maxAge / 2) + val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms").set(CLEANER_ENABLED, true) + val provider = new TestFsHistoryProvider(conf, clock) + + val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) + writeFile(log1, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), + SparkListenerApplicationEnd(2L) + ) + log1.setLastModified(0L) + + val log2 = newLogFile("app1", Some("attempt2"), inProgress = false) + writeFile(log2, None, + SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2")), + SparkListenerApplicationEnd(4L) + ) + log2.setLastModified(clock.getTimeMillis()) + + val log3 = newLogFile("app2", Some("attempt1"), inProgress = false) + writeFile(log3, None, + SparkListenerApplicationStart("app2", Some("app1"), 3L, "test", Some("attempt1")), + SparkListenerApplicationEnd(4L) + ) + log3.setLastModified(0L) + + provider.getListing().size should be (0) + + // Move the clock forward so log1 and log3 exceed the max age. + clock.advance(maxAge) + // Avoid unnecessary parse, the expired log files would be cleaned by checkForLogs(). + provider.checkForLogs() + + provider.doMergeApplicationListingCall should be (1) + provider.getListing().size should be (1) Review Comment: This new test code fails here on the master branch, but why it doesn't fail at line 1758 ~ 1760? If then, could you explain why this PR claims to `clean up expired event log` instead of reducing the number of `doMergeApplicationListCall` invocations? I'm wondering if I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org