dongjoon-hyun commented on code in PR #38983:
URL: https://github.com/apache/spark/pull/38983#discussion_r1048012538


##########
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala:
##########
@@ -1705,6 +1705,61 @@ abstract class FsHistoryProviderSuite extends 
SparkFunSuite with Matchers with P
     provider.stop()
   }
 
+  test("SPARK-41447: clean up expired event log files that don't exist in 
listing db") {
+    class TestFsHistoryProvider(conf: SparkConf, clock: Clock)
+      extends FsHistoryProvider(conf, clock) {
+      var doMergeApplicationListingCall = 0
+      override private[history] def doMergeApplicationListing(
+          reader: EventLogFileReader,
+          lastSeen: Long,
+          enableSkipToEnd: Boolean,
+          lastCompactionIndex: Option[Long]): Unit = {
+        super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd, 
lastCompactionIndex)
+        doMergeApplicationListingCall += 1
+      }
+    }
+
+    val maxAge = TimeUnit.SECONDS.toMillis(10)
+    val clock = new ManualClock(maxAge / 2)
+    val conf = createTestConf().set(MAX_LOG_AGE_S.key, 
s"${maxAge}ms").set(CLEANER_ENABLED, true)
+    val provider = new TestFsHistoryProvider(conf, clock)
+
+    val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+    writeFile(log1, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", 
Some("attempt1")),
+      SparkListenerApplicationEnd(2L)
+    )
+    log1.setLastModified(0L)
+
+    val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+    writeFile(log2, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", 
Some("attempt2")),
+      SparkListenerApplicationEnd(4L)
+    )
+    log2.setLastModified(clock.getTimeMillis())
+
+    val log3 = newLogFile("app2", Some("attempt1"), inProgress = false)
+    writeFile(log3, None,
+      SparkListenerApplicationStart("app2", Some("app1"), 3L, "test", 
Some("attempt1")),
+      SparkListenerApplicationEnd(4L)
+    )
+    log3.setLastModified(0L)
+
+    provider.getListing().size should be (0)
+
+    // Move the clock forward so log1 and log3 exceed the max age.
+    clock.advance(maxAge)
+    // Avoid unnecessary parse, the expired log files would be cleaned by 
checkForLogs().
+    provider.checkForLogs()
+
+    provider.doMergeApplicationListingCall should be (1)
+    provider.getListing().size should be (1)

Review Comment:
   This new test code fails here on the master branch, but why it doesn't fail 
at line 1758 ~ 1760? 
   If then, could you explain why this PR claims to `clean up expired event 
log` instead of reducing the number of `doMergeApplicationListCall` 
invocations? I'm wondering if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to