Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19770#discussion_r154816346
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
    @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
         freshUI.get.ui.store.job(0)
       }
     
    +  /**
    +   * Validate aggressive clean up removes incomplete or corrupt history 
files that would
    +   * otherwise be missed during clean up.  Also validate no behavior 
change if aggressive
    +   * clean up is disabled.
    +   */
    +  test("SPARK-21571: aggressive clean up removes incomplete history 
files") {
    +    createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, 
true)
    +    createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, 
true, false)
    +    createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, 
false, false)
    +    createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, 
false, false)
    +  }
    +
    +  /**
    +   * Create four test incomplete/corrupt history files and invoke a check 
and clean cycle that
    +   * passes followed by one occurring after the max age days rentention 
window and assert the
    +   * expected number of history files remain.
    +   * @param maxAgeDays maximum retention in days, used to simulate current 
time
    +   * @param lastModifiedDaysAgo last modified date for test files relative 
to current time
    +   * @param aggressiveCleanup aggressive clean up is enabled or not
    +   * @param expectEmptyInprogressRemoved expect an empty inprogress file 
to be removed
    +   * @param expectEmptyCorruptRemoved expect an empty corrupt complete 
file to be removed
    +   * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress 
file to be removed
    +   * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt 
complete file to be removed
    +   */
    +  private def createCleanAndCheckIncompleteLogFiles(
    +      maxAgeDays: Long,
    +      lastModifiedDaysAgo: Long,
    +      aggressiveCleanup: Boolean,
    +      expectEmptyInprogressRemoved: Boolean,
    +      expectEmptyCorruptRemoved: Boolean,
    +      expectNonEmptyInprogressRemoved: Boolean,
    +      expectNonEmptyCorruptRemoved: Boolean) = {
    +    // Set current time as 2 * maximum retention period to allow for 
expired history files.
    +    val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, 
TimeUnit.DAYS)
    +    val clock = new ManualClock(currentTimeMillis)
    +
    +    val lastModifiedTime = currentTimeMillis -
    +      MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS)
    +
    +    val provider = new FsHistoryProvider(
    +      createTestConf()
    +        .set("spark.history.fs.cleaner.aggressive", 
s"${aggressiveCleanup}")
    +        .set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d")
    +        .set("spark.testing", "true"),
    +      clock) {
    +      override def getNewLastScanTime(): Long = clock.getTimeMillis
    +    }
    +
    +    // Create history files
    +    // 1. 0-byte size files inprogress and corrupt complete files
    +    // 2. >0 byte size files inprogress and corrupt complete files
    +
    +    try {
    +      val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress 
= true)
    +      logfile1.createNewFile
    +      logfile1.setLastModified(lastModifiedTime)
    +
    +      val logfile2 = newLogFile("emptyCorruptLogFile", None, inProgress = 
false)
    +      logfile2.createNewFile
    +      logfile2.setLastModified(lastModifiedTime)
    +
    +      // Create an inprogress log file, has only start record.
    +      val logfile3 = newLogFile("nonEmptyInprogressLogFile", None, 
inProgress = true)
    +      writeFile(logfile3, true, None, SparkListenerApplicationStart(
    +        "inProgress1", Some("inProgress1"), 3L, "test", Some("attempt1"))
    +      )
    +      logfile3.setLastModified(lastModifiedTime)
    +
    +      // Create an incomplete log file, has an end record but no start 
record.
    +      val logfile4 = newLogFile("nonEmptyCorruptLogFile", None, inProgress 
= false)
    +      writeFile(logfile4, true, None, SparkListenerApplicationEnd(0))
    +      logfile4.setLastModified(lastModifiedTime)
    +
    +      // Simulate checking logs 1 day after initial creation.  This is 
necessary because the log
    +      // checker will sometimes use the current time in place of last 
modified time the first
    +      // time it encounters an inprogress file to work around certain file 
system inconsistencies.
    +      // No history files should clean up in first check and clean pass.
    +      clock.setTime(lastModifiedTime + MILLISECONDS.convert(1, 
TimeUnit.DAYS))
    +      provider.checkForLogs
    --- End diff --
    
    Add `()` to method calls (also in other places).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to